You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 18:28:35 UTC
[2/5] storm git commit: refactor MapCombinerAggStateUpdater
refactor MapCombinerAggStateUpdater
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/48c52db6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/48c52db6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/48c52db6
Branch: refs/heads/master
Commit: 48c52db6ed0c67549ce4d0ad7e50ad2d35fe5f5a
Parents: 716a18d
Author: Xin Wang <be...@163.com>
Authored: Wed Dec 23 19:07:50 2015 +0800
Committer: Xin Wang <be...@163.com>
Committed: Wed Dec 23 19:07:50 2015 +0800
----------------------------------------------------------------------
.../trident/state/map/MapCombinerAggStateUpdater.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/48c52db6/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java b/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java
index 80a5ffe..cefaf32 100644
--- a/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java
+++ b/storm-core/src/jvm/storm/trident/state/map/MapCombinerAggStateUpdater.java
@@ -33,24 +33,25 @@ import storm.trident.tuple.TridentTuple;
import storm.trident.tuple.TridentTupleView.ProjectionFactory;
public class MapCombinerAggStateUpdater implements StateUpdater<MapState> {
+ //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ private static final long serialVersionUID = 4783054195826968400L;
+
CombinerAggregator _agg;
Fields _groupFields;
Fields _inputFields;
ProjectionFactory _groupFactory;
ProjectionFactory _inputFactory;
ComboList.Factory _factory;
-
-
+
public MapCombinerAggStateUpdater(CombinerAggregator agg, Fields groupFields, Fields inputFields) {
_agg = agg;
_groupFields = groupFields;
_inputFields = inputFields;
- if(inputFields.size()!=1) {
+ if(inputFields.size() != 1) {
throw new IllegalArgumentException("Combiner aggs only take a single field as input. Got this instead: " + inputFields.toString());
}
_factory = new ComboList.Factory(groupFields.size(), inputFields.size());
}
-
@Override
public void updateState(MapState map, List<TridentTuple> tuples, TridentCollector collector) {
@@ -79,5 +80,5 @@ public class MapCombinerAggStateUpdater implements StateUpdater<MapState> {
@Override
public void cleanup() {
}
-
+
}