You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:25 UTC
[47/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
index 2a77f3b..ce4c955 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
@@ -42,87 +42,83 @@ public class BatchSubtopologyBuilder {
Map<String, Component> _bolts = new HashMap<String, Component>();
Component _masterBolt;
String _masterId;
-
+
public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
Integer p = boltParallelism == null ? null : boltParallelism.intValue();
_masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
_masterId = masterBoltId;
}
-
+
public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
this(masterBoltId, masterBolt, null);
}
-
+
public BoltDeclarer getMasterDeclarer() {
return new BoltDeclarerImpl(_masterBolt);
}
-
+
public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
return setBolt(id, bolt, null);
}
-
+
public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
- }
-
+ }
+
public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
return setBolt(id, bolt, null);
- }
-
+ }
+
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
}
-
+
private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
Integer p = null;
- if(parallelism!=null) p = parallelism.intValue();
+ if (parallelism != null)
+ p = parallelism.intValue();
Component component = new Component(bolt, p);
_bolts.put(id, component);
return new BoltDeclarerImpl(component);
}
-
+
public void extendTopology(TopologyBuilder builder) {
BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
- for(InputDeclaration decl: _masterBolt.declarations) {
+ for (InputDeclaration decl : _masterBolt.declarations) {
decl.declare(declarer);
}
- for(Map conf: _masterBolt.componentConfs) {
+ for (Map conf : _masterBolt.componentConfs) {
declarer.addConfigurations(conf);
}
- for(String id: _bolts.keySet()) {
+ for (String id : _bolts.keySet()) {
Component component = _bolts.get(id);
Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
- for(String c: componentBoltSubscriptions(component)) {
+ for (String c : componentBoltSubscriptions(component)) {
SourceArgs source;
- if(c.equals(_masterId)) {
+ if (c.equals(_masterId)) {
source = SourceArgs.single();
} else {
source = SourceArgs.all();
}
- coordinatedArgs.put(c, source);
+ coordinatedArgs.put(c, source);
}
-
-
- BoltDeclarer input = builder.setBolt(id,
- new CoordinatedBolt(component.bolt,
- coordinatedArgs,
- null),
- component.parallelism);
- for(Map conf: component.componentConfs) {
+
+ BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(component.bolt, coordinatedArgs, null), component.parallelism);
+ for (Map conf : component.componentConfs) {
input.addConfigurations(conf);
}
- for(String c: componentBoltSubscriptions(component)) {
+ for (String c : componentBoltSubscriptions(component)) {
input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
}
- for(InputDeclaration d: component.declarations) {
+ for (InputDeclaration d : component.declarations) {
d.declare(input);
}
- }
+ }
}
-
+
private Set<String> componentBoltSubscriptions(Component component) {
Set<String> ret = new HashSet<String>();
- for(InputDeclaration d: component.declarations) {
+ for (InputDeclaration d : component.declarations) {
ret.add(d.getComponent());
}
return ret;
@@ -133,25 +129,26 @@ public class BatchSubtopologyBuilder {
public Integer parallelism;
public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
public List<Map> componentConfs = new ArrayList<Map>();
-
+
public Component(IRichBolt bolt, Integer parallelism) {
this.bolt = bolt;
this.parallelism = parallelism;
}
}
-
+
private static interface InputDeclaration {
void declare(InputDeclarer declarer);
+
String getComponent();
}
-
+
private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
Component _component;
-
+
public BoltDeclarerImpl(Component component) {
_component = component;
}
-
+
@Override
public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
addDeclaration(new InputDeclaration() {
@@ -163,7 +160,7 @@ public class BatchSubtopologyBuilder {
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -174,12 +171,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.fieldsGrouping(component, streamId, fields);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -190,12 +187,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.globalGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -206,12 +203,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.globalGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -222,12 +219,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.shuffleGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -238,12 +235,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.shuffleGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -254,12 +251,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -270,8 +267,8 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component, streamId);
- }
-
+ }
+
@Override
public String getComponent() {
return component;
@@ -279,7 +276,7 @@ public class BatchSubtopologyBuilder {
});
return this;
}
-
+
@Override
public BoltDeclarer localFirstGrouping(final String componentId) {
addDeclaration(new InputDeclaration() {
@@ -287,7 +284,7 @@ public class BatchSubtopologyBuilder {
public void declare(InputDeclarer declarer) {
declarer.localFirstGrouping(componentId);
}
-
+
@Override
public String getComponent() {
return componentId;
@@ -295,7 +292,7 @@ public class BatchSubtopologyBuilder {
});
return this;
}
-
+
@Override
public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@@ -303,27 +300,27 @@ public class BatchSubtopologyBuilder {
public void declare(InputDeclarer declarer) {
declarer.localFirstGrouping(component, streamId);
}
-
+
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
-
+
@Override
public BoltDeclarer noneGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.noneGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -334,12 +331,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.noneGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -350,12 +347,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.allGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -366,12 +363,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.allGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -382,12 +379,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.directGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -398,12 +395,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.directGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -417,21 +414,21 @@ public class BatchSubtopologyBuilder {
public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
}
-
+
@Override
public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.customGrouping(component, grouping);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
- return this;
+ return this;
}
@Override
@@ -440,12 +437,12 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.customGrouping(component, streamId, grouping);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -456,16 +453,16 @@ public class BatchSubtopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.grouping(stream, grouping);
- }
+ }
@Override
public String getComponent() {
return stream.get_componentId();
- }
+ }
});
return this;
}
-
+
private void addDeclaration(InputDeclaration declaration) {
_component.declarations.add(declaration);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
index 6f337a6..39a158d 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
@@ -45,8 +45,7 @@ import org.slf4j.LoggerFactory;
import static backtype.storm.utils.Utils.get;
/**
- * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
- * in the case of retries.
+ * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused in the case of retries.
*/
public class CoordinatedBolt implements IRichBolt {
public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
@@ -58,8 +57,7 @@ public class CoordinatedBolt implements IRichBolt {
public static interface TimeoutCallback {
void timeoutId(Object id);
}
-
-
+
public static class SourceArgs implements Serializable {
public boolean singleCount;
@@ -74,7 +72,7 @@ public class CoordinatedBolt implements IRichBolt {
public static SourceArgs all() {
return new SourceArgs(false);
}
-
+
@Override
public String toString() {
return "<Single: " + singleCount + ">";
@@ -101,14 +99,14 @@ public class CoordinatedBolt implements IRichBolt {
public void ack(Tuple tuple) {
Object id = tuple.getValue(0);
- synchronized(_tracked) {
+ synchronized (_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null)
track.receivedTuples++;
}
boolean failed = checkFinishId(tuple, TupleType.REGULAR);
- if(failed) {
- _delegate.fail(tuple);
+ if (failed) {
+ _delegate.fail(tuple);
} else {
_delegate.ack(tuple);
}
@@ -116,7 +114,7 @@ public class CoordinatedBolt implements IRichBolt {
public void fail(Tuple tuple) {
Object id = tuple.getValue(0);
- synchronized(_tracked) {
+ synchronized (_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null)
track.failed = true;
@@ -124,18 +122,17 @@ public class CoordinatedBolt implements IRichBolt {
checkFinishId(tuple, TupleType.REGULAR);
_delegate.fail(tuple);
}
-
+
public void reportError(Throwable error) {
_delegate.reportError(error);
}
-
private void updateTaskCounts(Object id, List<Integer> tasks) {
- synchronized(_tracked) {
+ synchronized (_tracked) {
TrackingInfo track = _tracked.get(id);
if (track != null) {
Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
- for(Integer task: tasks) {
+ for (Integer task : tasks) {
int newCount = get(taskEmittedTuples, task, 0) + 1;
taskEmittedTuples.put(task, newCount);
}
@@ -161,34 +158,30 @@ public class CoordinatedBolt implements IRichBolt {
boolean receivedId = false;
boolean finished = false;
List<Tuple> ackTuples = new ArrayList<Tuple>();
-
+
@Override
public String toString() {
- return "reportCount: " + reportCount + "\n" +
- "expectedTupleCount: " + expectedTupleCount + "\n" +
- "receivedTuples: " + receivedTuples + "\n" +
- "failed: " + failed + "\n" +
- taskEmittedTuples.toString();
+ return "reportCount: " + reportCount + "\n" + "expectedTupleCount: " + expectedTupleCount + "\n" + "receivedTuples: " + receivedTuples + "\n"
+ + "failed: " + failed + "\n" + taskEmittedTuples.toString();
}
}
-
public static class IdStreamSpec implements Serializable {
GlobalStreamId _id;
-
+
public GlobalStreamId getGlobalStreamId() {
return _id;
}
public static IdStreamSpec makeDetectSpec(String component, String stream) {
return new IdStreamSpec(component, stream);
- }
-
+ }
+
protected IdStreamSpec(String component, String stream) {
_id = new GlobalStreamId(component, stream);
}
}
-
+
public CoordinatedBolt(IRichBolt delegate) {
this(delegate, null, null);
}
@@ -196,37 +189,35 @@ public class CoordinatedBolt implements IRichBolt {
public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
}
-
+
public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
_sourceArgs = sourceArgs;
- if(_sourceArgs==null) _sourceArgs = new HashMap<String, SourceArgs>();
+ if (_sourceArgs == null)
+ _sourceArgs = new HashMap<String, SourceArgs>();
_delegate = delegate;
_idStreamSpec = idStreamSpec;
}
-
+
public void prepare(Map config, TopologyContext context, OutputCollector collector) {
TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
- if(_delegate instanceof TimeoutCallback) {
+ if (_delegate instanceof TimeoutCallback) {
callback = new TimeoutItems();
}
_tracked = new TimeCacheMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback);
_collector = collector;
_delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
- for(String component: Utils.get(context.getThisTargets(),
- Constants.COORDINATED_STREAM_ID,
- new HashMap<String, Grouping>())
- .keySet()) {
- for(Integer task: context.getComponentTasks(component)) {
+ for (String component : Utils.get(context.getThisTargets(), Constants.COORDINATED_STREAM_ID, new HashMap<String, Grouping>()).keySet()) {
+ for (Integer task : context.getComponentTasks(component)) {
_countOutTasks.add(task);
}
}
- if(!_sourceArgs.isEmpty()) {
+ if (!_sourceArgs.isEmpty()) {
_numSourceReports = 0;
- for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
- if(entry.getValue().singleCount) {
- _numSourceReports+=1;
+ for (Entry<String, SourceArgs> entry : _sourceArgs.entrySet()) {
+ if (entry.getValue().singleCount) {
+ _numSourceReports += 1;
} else {
- _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
+ _numSourceReports += context.getComponentTasks(entry.getKey()).size();
}
}
}
@@ -235,57 +226,56 @@ public class CoordinatedBolt implements IRichBolt {
private boolean checkFinishId(Tuple tup, TupleType type) {
Object id = tup.getValue(0);
boolean failed = false;
-
- synchronized(_tracked) {
+
+ synchronized (_tracked) {
TrackingInfo track = _tracked.get(id);
try {
- if(track!=null) {
+ if (track != null) {
boolean delayed = false;
- if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
+ if (_idStreamSpec == null && type == TupleType.COORD || _idStreamSpec != null && type == TupleType.ID) {
track.ackTuples.add(tup);
delayed = true;
}
- if(track.failed) {
+ if (track.failed) {
failed = true;
- for(Tuple t: track.ackTuples) {
+ for (Tuple t : track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
- } else if(track.receivedId
- && (_sourceArgs.isEmpty() ||
- track.reportCount==_numSourceReports &&
- track.expectedTupleCount == track.receivedTuples)){
- if(_delegate instanceof FinishedCallback) {
- ((FinishedCallback)_delegate).finishedId(id);
+ } else if (track.receivedId
+ && (_sourceArgs.isEmpty() || track.reportCount == _numSourceReports && track.expectedTupleCount == track.receivedTuples)) {
+ if (_delegate instanceof FinishedCallback) {
+ ((FinishedCallback) _delegate).finishedId(id);
}
- if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
+ if (!(_sourceArgs.isEmpty() || type != TupleType.REGULAR)) {
throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
}
Iterator<Integer> outTasks = _countOutTasks.iterator();
- while(outTasks.hasNext()) {
+ while (outTasks.hasNext()) {
int task = outTasks.next();
int numTuples = get(track.taskEmittedTuples, task, 0);
_collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
}
- for(Tuple t: track.ackTuples) {
+ for (Tuple t : track.ackTuples) {
_collector.ack(t);
}
track.finished = true;
_tracked.remove(id);
}
- if(!delayed && type!=TupleType.REGULAR) {
- if(track.failed) {
+ if (!delayed && type != TupleType.REGULAR) {
+ if (track.failed) {
_collector.fail(tup);
} else {
- _collector.ack(tup);
+ _collector.ack(tup);
}
}
} else {
- if(type!=TupleType.REGULAR) _collector.fail(tup);
+ if (type != TupleType.REGULAR)
+ _collector.fail(tup);
}
- } catch(FailedException e) {
+ } catch (FailedException e) {
LOG.error("Failed to finish batch", e);
- for(Tuple t: track.ackTuples) {
+ for (Tuple t : track.ackTuples) {
_collector.fail(t);
}
_tracked.remove(id);
@@ -299,29 +289,30 @@ public class CoordinatedBolt implements IRichBolt {
Object id = tuple.getValue(0);
TrackingInfo track;
TupleType type = getTupleType(tuple);
- synchronized(_tracked) {
+ synchronized (_tracked) {
track = _tracked.get(id);
- if(track==null) {
+ if (track == null) {
track = new TrackingInfo();
- if(_idStreamSpec==null) track.receivedId = true;
+ if (_idStreamSpec == null)
+ track.receivedId = true;
_tracked.put(id, track);
}
}
-
- if(type==TupleType.ID) {
- synchronized(_tracked) {
+
+ if (type == TupleType.ID) {
+ synchronized (_tracked) {
track.receivedId = true;
}
- checkFinishId(tuple, type);
- } else if(type==TupleType.COORD) {
+ checkFinishId(tuple, type);
+ } else if (type == TupleType.COORD) {
int count = (Integer) tuple.getValue(1);
- synchronized(_tracked) {
+ synchronized (_tracked) {
track.reportCount++;
- track.expectedTupleCount+=count;
+ track.expectedTupleCount += count;
}
checkFinishId(tuple, type);
- } else {
- synchronized(_tracked) {
+ } else {
+ synchronized (_tracked) {
_delegate.execute(tuple);
}
}
@@ -341,42 +332,38 @@ public class CoordinatedBolt implements IRichBolt {
public Map<String, Object> getComponentConfiguration() {
return _delegate.getComponentConfiguration();
}
-
+
private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
Map<String, SourceArgs> ret = new HashMap<String, SourceArgs>();
ret.put(sourceComponent, sourceArgs);
return ret;
}
-
+
private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
@Override
public void expire(Object id, TrackingInfo val) {
- synchronized(_tracked) {
+ synchronized (_tracked) {
// the combination of the lock and the finished flag ensure that
// an id is never timed out if it has been finished
val.failed = true;
- if(!val.finished) {
+ if (!val.finished) {
((TimeoutCallback) _delegate).timeoutId(id);
}
}
}
}
-
+
private TupleType getTupleType(Tuple tuple) {
- if(_idStreamSpec!=null
- && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
+ if (_idStreamSpec != null && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
return TupleType.ID;
- } else if(!_sourceArgs.isEmpty()
- && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
+ } else if (!_sourceArgs.isEmpty() && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
return TupleType.COORD;
} else {
return TupleType.REGULAR;
}
}
-
+
static enum TupleType {
- REGULAR,
- ID,
- COORD
+ REGULAR, ID, COORD
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
index ee5d9bd..9a1abfa 100755
--- a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
@@ -25,6 +25,8 @@ import java.util.Map;
public interface IBatchBolt<T> extends Serializable, IComponent {
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
+
void execute(Tuple tuple);
+
void finishBatch();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
index 624db3e..d10872f 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -17,23 +17,22 @@
*/
package backtype.storm.drpc;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
+import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.DRPCRequest;
import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.generated.AuthorizationException;
import backtype.storm.security.auth.ThriftClient;
import backtype.storm.security.auth.ThriftConnectionType;
-import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
- private final AtomicReference<DistributedRPCInvocations.Client> client =
- new AtomicReference<DistributedRPCInvocations.Client>();
+ private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<DistributedRPCInvocations.Client>();
private String host;
private int port;
@@ -43,14 +42,14 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
this.port = port;
client.set(new DistributedRPCInvocations.Client(_protocol));
}
-
+
public String getHost() {
return host;
}
-
+
public int getPort() {
return port;
- }
+ }
public void reconnectClient() throws TException {
if (client.get() == null) {
@@ -70,9 +69,9 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
throw new TException("Client is not connected...");
}
c.result(id, result);
- } catch(AuthorizationException aze) {
+ } catch (AuthorizationException aze) {
throw aze;
- } catch(TException e) {
+ } catch (TException e) {
client.compareAndSet(c, null);
throw e;
}
@@ -85,24 +84,24 @@ public class DRPCInvocationsClient extends ThriftClient implements DistributedRP
throw new TException("Client is not connected...");
}
return c.fetchRequest(func);
- } catch(AuthorizationException aze) {
+ } catch (AuthorizationException aze) {
throw aze;
- } catch(TException e) {
+ } catch (TException e) {
client.compareAndSet(c, null);
throw e;
}
- }
+ }
- public void failRequest(String id) throws TException, AuthorizationException {
+ public void failRequest(String id) throws TException {
DistributedRPCInvocations.Client c = client.get();
try {
if (c == null) {
throw new TException("Client is not connected...");
}
c.failRequest(id);
- } catch(AuthorizationException aze) {
+ } catch (AuthorizationException aze) {
throw aze;
- } catch(TException e) {
+ } catch (TException e) {
client.compareAndSet(c, null);
throw e;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
index 4ed24d4..c490efd 100644
--- a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
@@ -17,25 +17,6 @@
*/
package backtype.storm.drpc;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.thrift.TException;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.jstorm.utils.NetWorkUtils;
-
import backtype.storm.Config;
import backtype.storm.ILocalDRPC;
import backtype.storm.generated.AuthorizationException;
@@ -50,31 +31,38 @@ import backtype.storm.tuple.Values;
import backtype.storm.utils.ExtendedThreadPoolExecutor;
import backtype.storm.utils.ServiceRegistry;
import backtype.storm.utils.Utils;
+import com.alibaba.jstorm.utils.NetWorkUtils;
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.*;
public class DRPCSpout extends BaseRichSpout {
- //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ // ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
static final long serialVersionUID = 2387848310969237877L;
public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
-
+
SpoutOutputCollector _collector;
List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
transient LinkedList<Future<Void>> _futures = null;
transient ExecutorService _backround = null;
String _function;
String _local_drpc_id = null;
-
+
private static class DRPCMessageId {
String id;
int index;
-
+
public DRPCMessageId(String id, int index) {
this.id = id;
this.index = index;
}
}
-
-
+
public DRPCSpout(String function) {
_function = function;
}
@@ -83,7 +71,7 @@ public class DRPCSpout extends BaseRichSpout {
_function = function;
_local_drpc_id = drpc.getServiceId();
}
-
+
private class Adder implements Callable<Void> {
private String server;
private int port;
@@ -129,16 +117,12 @@ public class DRPCSpout extends BaseRichSpout {
}
}
}
-
-
-
+
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
- if(_local_drpc_id==null) {
- _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
- 60L, TimeUnit.SECONDS,
- new SynchronousQueue<Runnable>());
+ if (_local_drpc_id == null) {
+ _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
_futures = new LinkedList<Future<Void>>();
int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
@@ -146,26 +130,26 @@ public class DRPCSpout extends BaseRichSpout {
int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
List<String> servers = NetWorkUtils.host2Ip((List<String>) conf.get(Config.DRPC_SERVERS));
-
- if(servers == null || servers.isEmpty()) {
- throw new RuntimeException("No DRPC servers configured for topology");
+
+ if (servers == null || servers.isEmpty()) {
+ throw new RuntimeException("No DRPC servers configured for topology");
}
-
+
if (numTasks < servers.size()) {
- for (String s: servers) {
+ for (String s : servers) {
_futures.add(_backround.submit(new Adder(s, port, conf)));
}
- } else {
+ } else {
int i = index % servers.size();
_futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
}
}
-
+
}
@Override
public void close() {
- for(DRPCInvocationsClient client: _clients) {
+ for (DRPCInvocationsClient client : _clients) {
client.close();
}
}
@@ -173,12 +157,12 @@ public class DRPCSpout extends BaseRichSpout {
@Override
public void nextTuple() {
boolean gotRequest = false;
- if(_local_drpc_id==null) {
+ if (_local_drpc_id == null) {
int size = 0;
synchronized (_clients) {
- size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+ size = _clients.size(); // This will only ever grow, so no need to worry about falling off the end
}
- for(int i=0; i<size; i++) {
+ for (int i = 0; i < size; i++) {
DRPCInvocationsClient client;
synchronized (_clients) {
client = _clients.get(i);
@@ -188,7 +172,7 @@ public class DRPCSpout extends BaseRichSpout {
}
try {
DRPCRequest req = client.fetchRequest(_function);
- if(req.get_request_id().length() > 0) {
+ if (req.get_request_id().length() > 0) {
Map returnInfo = new HashMap();
returnInfo.put("id", req.get_request_id());
returnInfo.put("host", client.getHost());
@@ -210,10 +194,10 @@ public class DRPCSpout extends BaseRichSpout {
checkFutures();
} else {
DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
- if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
+ if (drpc != null) { // can happen during shutdown of drpc while topology is still up
try {
DRPCRequest req = drpc.fetchRequest(_function);
- if(req.get_request_id().length() > 0) {
+ if (req.get_request_id().length() > 0) {
Map returnInfo = new HashMap();
returnInfo.put("id", req.get_request_id());
returnInfo.put("host", _local_drpc_id);
@@ -228,7 +212,7 @@ public class DRPCSpout extends BaseRichSpout {
}
}
}
- if(!gotRequest) {
+ if (!gotRequest) {
Utils.sleep(1);
}
}
@@ -241,8 +225,8 @@ public class DRPCSpout extends BaseRichSpout {
public void fail(Object msgId) {
DRPCMessageId did = (DRPCMessageId) msgId;
DistributedRPCInvocations.Iface client;
-
- if(_local_drpc_id == null) {
+
+ if (_local_drpc_id == null) {
client = _clients.get(did.index);
} else {
client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
@@ -259,5 +243,5 @@ public class DRPCSpout extends BaseRichSpout {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("args", "return-info"));
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
index b74b97e..e9195e7 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
@@ -31,7 +31,6 @@ import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class JoinResult extends BaseRichBolt {
public static Logger LOG = LoggerFactory.getLogger(JoinResult.class);
@@ -43,27 +42,27 @@ public class JoinResult extends BaseRichBolt {
public JoinResult(String returnComponent) {
this.returnComponent = returnComponent;
}
-
+
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
_collector = collector;
}
public void execute(Tuple tuple) {
Object requestId = tuple.getValue(0);
- if(tuple.getSourceComponent().equals(returnComponent)) {
+ if (tuple.getSourceComponent().equals(returnComponent)) {
returns.put(requestId, tuple);
} else {
results.put(requestId, tuple);
}
- if(returns.containsKey(requestId) && results.containsKey(requestId)) {
+ if (returns.containsKey(requestId) && results.containsKey(requestId)) {
Tuple result = results.remove(requestId);
Tuple returner = returns.remove(requestId);
LOG.debug(result.getValue(1).toString());
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(result);
- anchors.add(returner);
- _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1)));
+ anchors.add(returner);
+ _collector.emit(anchors, new Values("" + result.getValue(1), returner.getValue(1)));
_collector.ack(result);
_collector.ack(returner);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
index 113163d..2294c54 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
@@ -29,7 +29,6 @@ import backtype.storm.utils.KeyedRoundRobinQueue;
import java.util.HashMap;
import java.util.Map;
-
public class KeyedFairBolt implements IRichBolt, FinishedCallback {
IRichBolt _delegate;
KeyedRoundRobinQueue<Tuple> _rrQueue;
@@ -39,14 +38,13 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
public KeyedFairBolt(IRichBolt delegate) {
_delegate = delegate;
}
-
+
public KeyedFairBolt(IBasicBolt delegate) {
this(new BasicBoltExecutor(delegate));
}
-
-
+
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- if(_delegate instanceof FinishedCallback) {
+ if (_delegate instanceof FinishedCallback) {
_callback = (FinishedCallback) _delegate;
}
_delegate.prepare(stormConf, context, collector);
@@ -54,7 +52,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
_executor = new Thread(new Runnable() {
public void run() {
try {
- while(true) {
+ while (true) {
_delegate.execute(_rrQueue.take());
}
} catch (InterruptedException e) {
@@ -81,7 +79,7 @@ public class KeyedFairBolt implements IRichBolt, FinishedCallback {
}
public void finishedId(Object id) {
- if(_callback!=null) {
+ if (_callback != null) {
_callback.finishedId(id);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
index d03075e..ddcac35 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
@@ -23,30 +23,39 @@ import backtype.storm.tuple.Fields;
public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
+
public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
public LinearDRPCInputDeclarer globalGrouping();
+
public LinearDRPCInputDeclarer globalGrouping(String streamId);
public LinearDRPCInputDeclarer shuffleGrouping();
+
public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
public LinearDRPCInputDeclarer localOrShuffleGrouping();
+
public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
-
+
public LinearDRPCInputDeclarer noneGrouping();
+
public LinearDRPCInputDeclarer noneGrouping(String streamId);
public LinearDRPCInputDeclarer allGrouping();
+
public LinearDRPCInputDeclarer allGrouping(String streamId);
public LinearDRPCInputDeclarer directGrouping();
+
public LinearDRPCInputDeclarer directGrouping(String streamId);
public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
+
public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields);
public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
+
public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping);
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
index ebbbc6d..e8c202e 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCTopologyBuilder.java
@@ -43,39 +43,38 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
// Trident subsumes the functionality provided by this class, so it's deprecated
@Deprecated
-public class LinearDRPCTopologyBuilder {
+public class LinearDRPCTopologyBuilder {
String _function;
List<Component> _components = new ArrayList<Component>();
-
-
+
public LinearDRPCTopologyBuilder(String function) {
_function = function;
}
-
+
public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {
return addBolt(new BatchBoltExecutor(bolt), parallelism);
}
-
+
public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
return addBolt(bolt, 1);
}
-
+
@Deprecated
public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {
- if(parallelism==null) parallelism = 1;
+ if (parallelism == null)
+ parallelism = 1;
Component component = new Component(bolt, parallelism.intValue());
_components.add(component);
return new InputDeclarerImpl(component);
}
-
+
@Deprecated
public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
return addBolt(bolt, null);
}
-
+
public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {
return addBolt(new BasicBoltExecutor(bolt), parallelism);
}
@@ -83,125 +82,119 @@ public class LinearDRPCTopologyBuilder {
public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
return addBolt(bolt, null);
}
-
+
public StormTopology createLocalTopology(ILocalDRPC drpc) {
return createTopology(new DRPCSpout(_function, drpc));
}
-
+
public StormTopology createRemoteTopology() {
return createTopology(new DRPCSpout(_function));
}
-
-
+
private StormTopology createTopology(DRPCSpout spout) {
final String SPOUT_ID = "spout";
final String PREPARE_ID = "prepare-request";
-
+
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout);
- builder.setBolt(PREPARE_ID, new PrepareRequest())
- .noneGrouping(SPOUT_ID);
- int i=0;
- for(; i<_components.size();i++) {
+ builder.setBolt(PREPARE_ID, new PrepareRequest()).noneGrouping(SPOUT_ID);
+ int i = 0;
+ for (; i < _components.size(); i++) {
Component component = _components.get(i);
-
+
Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
- if (i==1) {
- source.put(boltId(i-1), SourceArgs.single());
- } else if (i>=2) {
- source.put(boltId(i-1), SourceArgs.all());
+ if (i == 1) {
+ source.put(boltId(i - 1), SourceArgs.single());
+ } else if (i >= 2) {
+ source.put(boltId(i - 1), SourceArgs.all());
}
IdStreamSpec idSpec = null;
- if(i==_components.size()-1 && component.bolt instanceof FinishedCallback) {
+ if (i == _components.size() - 1 && component.bolt instanceof FinishedCallback) {
idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
}
- BoltDeclarer declarer = builder.setBolt(
- boltId(i),
- new CoordinatedBolt(component.bolt, source, idSpec),
- component.parallelism);
-
- for(Map conf: component.componentConfs) {
+ BoltDeclarer declarer = builder.setBolt(boltId(i), new CoordinatedBolt(component.bolt, source, idSpec), component.parallelism);
+
+ for (Map conf : component.componentConfs) {
declarer.addConfigurations(conf);
}
-
- if(idSpec!=null) {
+
+ if (idSpec != null) {
declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields("request"));
}
- if(i==0 && component.declarations.isEmpty()) {
+ if (i == 0 && component.declarations.isEmpty()) {
declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
} else {
String prevId;
- if(i==0) {
+ if (i == 0) {
prevId = PREPARE_ID;
} else {
- prevId = boltId(i-1);
+ prevId = boltId(i - 1);
}
- for(InputDeclaration declaration: component.declarations) {
+ for (InputDeclaration declaration : component.declarations) {
declaration.declare(prevId, declarer);
}
}
- if(i>0) {
- declarer.directGrouping(boltId(i-1), Constants.COORDINATED_STREAM_ID);
+ if (i > 0) {
+ declarer.directGrouping(boltId(i - 1), Constants.COORDINATED_STREAM_ID);
}
}
-
- IRichBolt lastBolt = _components.get(_components.size()-1).bolt;
+
+ IRichBolt lastBolt = _components.get(_components.size() - 1).bolt;
OutputFieldsGetter getter = new OutputFieldsGetter();
lastBolt.declareOutputFields(getter);
Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
- if(streams.size()!=1) {
+ if (streams.size() != 1) {
throw new RuntimeException("Must declare exactly one stream from last bolt in LinearDRPCTopology");
}
String outputStream = streams.keySet().iterator().next();
List<String> fields = streams.get(outputStream).get_output_fields();
- if(fields.size()!=2) {
- throw new RuntimeException("Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
+ if (fields.size() != 2) {
+ throw new RuntimeException(
+ "Output stream of last component in LinearDRPCTopology must contain exactly two fields. The first should be the request id, and the second should be the result.");
}
- builder.setBolt("JoinResult", new JoinResult(PREPARE_ID))
- .fieldsGrouping(boltId(i-1), outputStream, new Fields(fields.get(0)))
+ builder.setBolt("JoinResult", new JoinResult(PREPARE_ID)).fieldsGrouping(boltId(i - 1), outputStream, new Fields(fields.get(0)))
.fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields("request"));
i++;
- builder.setBolt("ReturnResults", new ReturnResults())
- .noneGrouping("JoinResult");
+ builder.setBolt("ReturnResults", new ReturnResults()).noneGrouping("JoinResult");
return builder.createTopology();
}
-
+
private static String boltId(int index) {
return "bolt" + index;
}
-
+
private static class Component {
public IRichBolt bolt;
public int parallelism;
public List<Map> componentConfs;
public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-
+
public Component(IRichBolt bolt, int parallelism) {
this.bolt = bolt;
this.parallelism = parallelism;
this.componentConfs = new ArrayList();
}
}
-
+
private static interface InputDeclaration {
public void declare(String prevComponent, InputDeclarer declarer);
}
-
+
private class InputDeclarerImpl extends BaseConfigurationDeclarer<LinearDRPCInputDeclarer> implements LinearDRPCInputDeclarer {
Component _component;
-
+
public InputDeclarerImpl(Component component) {
_component = component;
}
-
+
@Override
public LinearDRPCInputDeclarer fieldsGrouping(final Fields fields) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.fieldsGrouping(prevComponent, fields);
- }
+ }
});
return this;
}
@@ -212,7 +205,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.fieldsGrouping(prevComponent, streamId, fields);
- }
+ }
});
return this;
}
@@ -223,7 +216,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.globalGrouping(prevComponent);
- }
+ }
});
return this;
}
@@ -234,7 +227,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.globalGrouping(prevComponent, streamId);
- }
+ }
});
return this;
}
@@ -245,7 +238,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.shuffleGrouping(prevComponent);
- }
+ }
});
return this;
}
@@ -256,7 +249,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.shuffleGrouping(prevComponent, streamId);
- }
+ }
});
return this;
}
@@ -267,7 +260,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.localOrShuffleGrouping(prevComponent);
- }
+ }
});
return this;
}
@@ -278,18 +271,18 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.localOrShuffleGrouping(prevComponent, streamId);
- }
+ }
});
return this;
}
-
+
@Override
public LinearDRPCInputDeclarer noneGrouping() {
addDeclaration(new InputDeclaration() {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.noneGrouping(prevComponent);
- }
+ }
});
return this;
}
@@ -300,7 +293,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.noneGrouping(prevComponent, streamId);
- }
+ }
});
return this;
}
@@ -311,7 +304,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.allGrouping(prevComponent);
- }
+ }
});
return this;
}
@@ -322,7 +315,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.allGrouping(prevComponent, streamId);
- }
+ }
});
return this;
}
@@ -333,7 +326,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.directGrouping(prevComponent);
- }
+ }
});
return this;
}
@@ -344,7 +337,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.directGrouping(prevComponent, streamId);
- }
+ }
});
return this;
}
@@ -365,7 +358,7 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.customGrouping(prevComponent, grouping);
- }
+ }
});
return this;
}
@@ -376,11 +369,11 @@ public class LinearDRPCTopologyBuilder {
@Override
public void declare(String prevComponent, InputDeclarer declarer) {
declarer.customGrouping(prevComponent, streamId, grouping);
- }
+ }
});
return this;
}
-
+
private void addDeclaration(InputDeclaration declaration) {
_component.declarations.add(declaration);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java b/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
index bd32169..fea8b36 100755
--- a/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/PrepareRequest.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Random;
import backtype.storm.utils.Utils;
-
public class PrepareRequest extends BaseBasicBolt {
public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
public static final String RETURN_STREAM = "ret";
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java b/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
index 2ca517e..129e2b3 100644
--- a/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/ReturnResults.java
@@ -37,15 +37,14 @@ import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.json.simple.JSONValue;
-
public class ReturnResults extends BaseRichBolt {
- //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ // ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
static final long serialVersionUID = -774882142710631591L;
public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
OutputCollector _collector;
boolean local;
- Map _conf;
+ Map _conf;
Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();
@Override
@@ -59,22 +58,24 @@ public class ReturnResults extends BaseRichBolt {
public void execute(Tuple input) {
String result = (String) input.getValue(0);
String returnInfo = (String) input.getValue(1);
- //LOG.info("Receive one message, resultInfo:{}, result:{}", returnInfo, result);
- if(returnInfo!=null) {
+ // LOG.info("Receive one message, resultInfo:{}, result:{}", returnInfo, result);
+ if (returnInfo != null) {
Map retMap = (Map) JSONValue.parse(returnInfo);
final String host = (String) retMap.get("host");
final int port = Utils.getInt(retMap.get("port"));
String id = (String) retMap.get("id");
DistributedRPCInvocations.Iface client;
- if(local) {
+ if (local) {
client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
} else {
- List server = new ArrayList() {{
- add(host);
- add(port);
- }};
-
- if(!_clients.containsKey(server)) {
+ List server = new ArrayList() {
+ {
+ add(host);
+ add(port);
+ }
+ };
+
+ if (!_clients.containsKey(server)) {
try {
_clients.put(server, new DRPCInvocationsClient(_conf, host, port));
} catch (TTransportException ex) {
@@ -83,7 +84,7 @@ public class ReturnResults extends BaseRichBolt {
}
client = _clients.get(server);
}
-
+
try {
client.result(id, result);
_collector.ack(input);
@@ -93,29 +94,29 @@ public class ReturnResults extends BaseRichBolt {
if (client instanceof DRPCInvocationsClient) {
try {
LOG.info("reconnecting... ");
- ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+ ((DRPCInvocationsClient) client).reconnectClient(); // Blocking call
} catch (TException e2) {
throw new RuntimeException(e2);
}
}
- } catch(TException e) {
+ } catch (TException e) {
LOG.error("Failed to return results to DRPC server", e);
_collector.fail(input);
if (client instanceof DRPCInvocationsClient) {
try {
LOG.info("reconnecting... ");
- ((DRPCInvocationsClient)client).reconnectClient(); //Blocking call
+ ((DRPCInvocationsClient) client).reconnectClient(); // Blocking call
} catch (TException e2) {
throw new RuntimeException(e2);
}
}
}
}
- }
+ }
@Override
public void cleanup() {
- for(DRPCInvocationsClient c: _clients.values()) {
+ for (DRPCInvocationsClient c : _clients.values()) {
c.close();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java b/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
index 06eadaf..533b112 100644
--- a/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
+++ b/jstorm-core/src/main/java/backtype/storm/generated/AlreadyAliveException.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20")
public class AlreadyAliveException extends TException implements org.apache.thrift.TBase<AlreadyAliveException, AlreadyAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<AlreadyAliveException> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AlreadyAliveException");
@@ -264,11 +264,11 @@ public class AlreadyAliveException extends TException implements org.apache.thri
return _Fields.findByThriftId(fieldId);
}
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@@ -288,10 +288,10 @@ public class AlreadyAliveException extends TException implements org.apache.thri
return sb.toString();
}
- public void validate() throws org.apache.thrift.TException {
+ public void validate() throws TException {
// check for required fields
if (!is_set_msg()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+ throw new TProtocolException("Required field 'msg' is unset! Struct:" + toString());
}
// check for sub-struct validity
@@ -300,7 +300,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
+ } catch (TException te) {
throw new java.io.IOException(te);
}
}
@@ -308,7 +308,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
+ } catch (TException te) {
throw new java.io.IOException(te);
}
}
@@ -321,7 +321,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
private static class AlreadyAliveExceptionStandardScheme extends StandardScheme<AlreadyAliveException> {
- public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol iprot, AlreadyAliveException struct) throws TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
@@ -348,7 +348,7 @@ public class AlreadyAliveException extends TException implements org.apache.thri
struct.validate();
}
- public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot, AlreadyAliveException struct) throws TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
@@ -372,13 +372,13 @@ public class AlreadyAliveException extends TException implements org.apache.thri
private static class AlreadyAliveExceptionTupleScheme extends TupleScheme<AlreadyAliveException> {
@Override
- public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
oprot.writeString(struct.msg);
}
@Override
- public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol prot, AlreadyAliveException struct) throws TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
struct.msg = iprot.readString();
struct.set_msg_isSet(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java b/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
index 02f72f0..0822f50 100644
--- a/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
+++ b/jstorm-core/src/main/java/backtype/storm/generated/AuthorizationException.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20")
public class AuthorizationException extends TException implements org.apache.thrift.TBase<AuthorizationException, AuthorizationException._Fields>, java.io.Serializable, Cloneable, Comparable<AuthorizationException> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AuthorizationException");
@@ -264,11 +264,11 @@ public class AuthorizationException extends TException implements org.apache.thr
return _Fields.findByThriftId(fieldId);
}
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@@ -288,10 +288,10 @@ public class AuthorizationException extends TException implements org.apache.thr
return sb.toString();
}
- public void validate() throws org.apache.thrift.TException {
+ public void validate() throws TException {
// check for required fields
if (!is_set_msg()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'msg' is unset! Struct:" + toString());
+ throw new TProtocolException("Required field 'msg' is unset! Struct:" + toString());
}
// check for sub-struct validity
@@ -300,7 +300,7 @@ public class AuthorizationException extends TException implements org.apache.thr
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
+ } catch (TException te) {
throw new java.io.IOException(te);
}
}
@@ -308,7 +308,7 @@ public class AuthorizationException extends TException implements org.apache.thr
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
+ } catch (TException te) {
throw new java.io.IOException(te);
}
}
@@ -321,7 +321,7 @@ public class AuthorizationException extends TException implements org.apache.thr
private static class AuthorizationExceptionStandardScheme extends StandardScheme<AuthorizationException> {
- public void read(org.apache.thrift.protocol.TProtocol iprot, AuthorizationException struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol iprot, AuthorizationException struct) throws TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
@@ -348,7 +348,7 @@ public class AuthorizationException extends TException implements org.apache.thr
struct.validate();
}
- public void write(org.apache.thrift.protocol.TProtocol oprot, AuthorizationException struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot, AuthorizationException struct) throws TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
@@ -372,13 +372,13 @@ public class AuthorizationException extends TException implements org.apache.thr
private static class AuthorizationExceptionTupleScheme extends TupleScheme<AuthorizationException> {
@Override
- public void write(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
oprot.writeString(struct.msg);
}
@Override
- public void read(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol prot, AuthorizationException struct) throws TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
struct.msg = iprot.readString();
struct.set_msg_isSet(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java b/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
index e3d0a07..9241322 100644
--- a/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/generated/Bolt.java
@@ -34,7 +34,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-7-27")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-20")
public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.io.Serializable, Cloneable, Comparable<Bolt> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Bolt");
@@ -337,11 +337,11 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
return _Fields.findByThriftId(fieldId);
}
- public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws TException {
schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
}
- public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws TException {
schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
}
@@ -369,14 +369,14 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
return sb.toString();
}
- public void validate() throws org.apache.thrift.TException {
+ public void validate() throws TException {
// check for required fields
if (!is_set_bolt_object()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'bolt_object' is unset! Struct:" + toString());
+ throw new TProtocolException("Required field 'bolt_object' is unset! Struct:" + toString());
}
if (!is_set_common()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'common' is unset! Struct:" + toString());
+ throw new TProtocolException("Required field 'common' is unset! Struct:" + toString());
}
// check for sub-struct validity
@@ -388,7 +388,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
try {
write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift.TException te) {
+ } catch (TException te) {
throw new java.io.IOException(te);
}
}
@@ -396,7 +396,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift.TException te) {
+ } catch (TException te) {
throw new java.io.IOException(te);
}
}
@@ -409,7 +409,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
private static class BoltStandardScheme extends StandardScheme<Bolt> {
- public void read(org.apache.thrift.protocol.TProtocol iprot, Bolt struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol iprot, Bolt struct) throws TException {
org.apache.thrift.protocol.TField schemeField;
iprot.readStructBegin();
while (true)
@@ -446,7 +446,7 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
struct.validate();
}
- public void write(org.apache.thrift.protocol.TProtocol oprot, Bolt struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol oprot, Bolt struct) throws TException {
struct.validate();
oprot.writeStructBegin(STRUCT_DESC);
@@ -475,14 +475,14 @@ public class Bolt implements org.apache.thrift.TBase<Bolt, Bolt._Fields>, java.i
private static class BoltTupleScheme extends TupleScheme<Bolt> {
@Override
- public void write(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws org.apache.thrift.TException {
+ public void write(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws TException {
TTupleProtocol oprot = (TTupleProtocol) prot;
struct.bolt_object.write(oprot);
struct.common.write(oprot);
}
@Override
- public void read(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws org.apache.thrift.TException {
+ public void read(org.apache.thrift.protocol.TProtocol prot, Bolt struct) throws TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
struct.bolt_object = new ComponentObject();
struct.bolt_object.read(iprot);