You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:06 UTC
[28/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
index f7ce534..3768cb1 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
@@ -35,40 +35,38 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TransactionalSpoutCoordinator extends BaseRichSpout {
+public class TransactionalSpoutCoordinator extends BaseRichSpout {
public static final Logger LOG = LoggerFactory.getLogger(TransactionalSpoutCoordinator.class);
-
+
public static final BigInteger INIT_TXID = BigInteger.ONE;
-
-
+
public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/batch";
public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class.getName() + "/commit";
private static final String CURRENT_TX = "currtx";
private static final String META_DIR = "meta";
-
+
private ITransactionalSpout _spout;
private ITransactionalSpout.Coordinator _coordinator;
private TransactionalState _state;
private RotatingTransactionalState _coordinatorState;
-
+
TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>();
-
+
private SpoutOutputCollector _collector;
private Random _rand;
BigInteger _currTransaction;
int _maxTransactionActive;
StateInitializer _initializer;
-
-
+
public TransactionalSpoutCoordinator(ITransactionalSpout spout) {
_spout = spout;
}
-
+
public ITransactionalSpout getSpout() {
return _spout;
}
-
+
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_rand = new Random(Utils.secureRandomLong());
@@ -78,7 +76,7 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
_coordinator = _spout.getCoordinator(conf, context);
_currTransaction = getStoredCurrTransaction(_state);
Object active = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
- if(active==null) {
+ if (active == null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = Utils.getInt(active);
@@ -100,10 +98,10 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
public void ack(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus status = _activeTx.get(tx.getTransactionId());
- if(status!=null && tx.equals(status.attempt)) {
- if(status.status==AttemptStatus.PROCESSING) {
+ if (status != null && tx.equals(status.attempt)) {
+ if (status.status == AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
- } else if(status.status==AttemptStatus.COMMITTING) {
+ } else if (status.status == AttemptStatus.COMMITTING) {
_activeTx.remove(tx.getTransactionId());
_coordinatorState.cleanupBefore(tx.getTransactionId());
_currTransaction = nextTransactionId(tx.getTransactionId());
@@ -117,12 +115,12 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
public void fail(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
- if(stored!=null && tx.equals(stored.attempt)) {
+ if (stored != null && tx.equals(stored.attempt)) {
_activeTx.tailMap(tx.getTransactionId()).clear();
sync();
}
}
-
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// in partitioned example, in case an emitter task receives a later transaction than it's emitted so far,
@@ -130,24 +128,23 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx", "tx-meta", "committed-txid"));
declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
}
-
+
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),
// and there won't be a batch for tx 4 because there's max_spout_pending tx active
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
- if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
+ if (maybeCommit != null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
}
-
+
try {
- if(_activeTx.size() < _maxTransactionActive) {
+ if (_activeTx.size() < _maxTransactionActive) {
BigInteger curr = _currTransaction;
- for(int i=0; i<_maxTransactionActive; i++) {
- if((_coordinatorState.hasCache(curr) || _coordinator.isReady())
- && !_activeTx.containsKey(curr)) {
+ for (int i = 0; i < _maxTransactionActive; i++) {
+ if ((_coordinatorState.hasCache(curr) || _coordinator.isReady()) && !_activeTx.containsKey(curr)) {
TransactionAttempt attempt = new TransactionAttempt(curr, _rand.nextLong());
Object state = _coordinatorState.getState(curr, _initializer);
_activeTx.put(curr, new TransactionStatus(attempt));
@@ -155,8 +152,8 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
}
curr = nextTransactionId(curr);
}
- }
- } catch(FailedException e) {
+ }
+ } catch (FailedException e) {
LOG.warn("Failed to get metadata for a transaction", e);
}
}
@@ -167,17 +164,15 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
ret.setMaxTaskParallelism(1);
return ret;
}
-
+
private static enum AttemptStatus {
- PROCESSING,
- PROCESSED,
- COMMITTING
+ PROCESSING, PROCESSED, COMMITTING
}
-
+
private static class TransactionStatus {
TransactionAttempt attempt;
AttemptStatus status;
-
+
public TransactionStatus(TransactionAttempt attempt) {
this.attempt = attempt;
this.status = AttemptStatus.PROCESSING;
@@ -186,28 +181,29 @@ public class TransactionalSpoutCoordinator extends BaseRichSpout {
@Override
public String toString() {
return attempt.toString() + " <" + status.toString() + ">";
- }
+ }
}
-
-
+
private BigInteger nextTransactionId(BigInteger id) {
return id.add(BigInteger.ONE);
}
-
+
private BigInteger previousTransactionId(BigInteger id) {
- if(id.equals(INIT_TXID)) {
+ if (id.equals(INIT_TXID)) {
return null;
} else {
return id.subtract(BigInteger.ONE);
}
- }
-
+ }
+
private BigInteger getStoredCurrTransaction(TransactionalState state) {
BigInteger ret = (BigInteger) state.getData(CURRENT_TX);
- if(ret==null) return INIT_TXID;
- else return ret;
+ if (ret == null)
+ return INIT_TXID;
+ else
+ return ret;
}
-
+
private class StateInitializer implements RotatingTransactionalState.StateInitializer {
@Override
public Object init(BigInteger txid, Object lastState) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
index 98d1163..e775eb5 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
@@ -50,8 +50,7 @@ import java.util.Map;
import java.util.Set;
/**
- * Trident subsumes the functionality provided by transactional topologies, so this
- * class is deprecated.
+ * Trident subsumes the functionality provided by transactional topologies, so this class is deprecated.
*
*/
@Deprecated
@@ -62,16 +61,16 @@ public class TransactionalTopologyBuilder {
Map<String, Component> _bolts = new HashMap<String, Component>();
Integer _spoutParallelism;
List<Map> _spoutConfs = new ArrayList();
-
+
// id is used to store the state of this transactionalspout in zookeeper
- // it would be very dangerous to have 2 topologies active with the same id in the same cluster
+ // it would be very dangerous to have 2 topologies active with the same id in the same cluster
public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout, Number spoutParallelism) {
_id = id;
_spoutId = spoutId;
_spout = spout;
_spoutParallelism = (spoutParallelism == null) ? null : spoutParallelism.intValue();
}
-
+
public TransactionalTopologyBuilder(String id, String spoutId, ITransactionalSpout spout) {
this(id, spoutId, spout, null);
}
@@ -79,27 +78,27 @@ public class TransactionalTopologyBuilder {
public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout, Number spoutParallelism) {
this(id, spoutId, new PartitionedTransactionalSpoutExecutor(spout), spoutParallelism);
}
-
+
public TransactionalTopologyBuilder(String id, String spoutId, IPartitionedTransactionalSpout spout) {
this(id, spoutId, spout, null);
}
-
+
public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout, Number spoutParallelism) {
this(id, spoutId, new OpaquePartitionedTransactionalSpoutExecutor(spout), spoutParallelism);
}
-
+
public TransactionalTopologyBuilder(String id, String spoutId, IOpaquePartitionedTransactionalSpout spout) {
this(id, spoutId, spout, null);
}
-
+
public SpoutDeclarer getSpoutDeclarer() {
return new SpoutDeclarerImpl();
}
-
+
public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
return setBolt(id, bolt, null);
}
-
+
public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
return setBolt(id, new BatchBoltExecutor(bolt), parallelism, bolt instanceof ICommitter);
}
@@ -107,86 +106,79 @@ public class TransactionalTopologyBuilder {
public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt) {
return setCommitterBolt(id, bolt, null);
}
-
+
public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt, Number parallelism) {
return setBolt(id, new BatchBoltExecutor(bolt), parallelism, true);
- }
-
+ }
+
public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
return setBolt(id, bolt, null);
- }
-
+ }
+
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
return setBolt(id, new BasicBoltExecutor(bolt), parallelism, false);
}
-
+
private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism, boolean committer) {
Integer p = null;
- if(parallelism!=null) p = parallelism.intValue();
+ if (parallelism != null)
+ p = parallelism.intValue();
Component component = new Component(bolt, p, committer);
_bolts.put(id, component);
return new BoltDeclarerImpl(component);
}
-
+
public TopologyBuilder buildTopologyBuilder() {
String coordinator = _spoutId + "/coordinator";
TopologyBuilder builder = new TopologyBuilder();
SpoutDeclarer declarer = builder.setSpout(coordinator, new TransactionalSpoutCoordinator(_spout));
- for(Map conf: _spoutConfs) {
+ for (Map conf : _spoutConfs) {
declarer.addConfigurations(conf);
}
declarer.addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
- BoltDeclarer emitterDeclarer =
- builder.setBolt(_spoutId,
- new CoordinatedBolt(new TransactionalSpoutBatchExecutor(_spout),
- null,
- null),
- _spoutParallelism)
- .allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID)
- .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
- if(_spout instanceof ICommitterTransactionalSpout) {
+ BoltDeclarer emitterDeclarer =
+ builder.setBolt(_spoutId, new CoordinatedBolt(new TransactionalSpoutBatchExecutor(_spout), null, null), _spoutParallelism)
+ .allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID)
+ .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
+ if (_spout instanceof ICommitterTransactionalSpout) {
emitterDeclarer.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
}
- for(String id: _bolts.keySet()) {
+ for (String id : _bolts.keySet()) {
Component component = _bolts.get(id);
Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
- for(String c: componentBoltSubscriptions(component)) {
+ for (String c : componentBoltSubscriptions(component)) {
coordinatedArgs.put(c, SourceArgs.all());
}
-
+
IdStreamSpec idSpec = null;
- if(component.committer) {
- idSpec = IdStreamSpec.makeDetectSpec(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
+ if (component.committer) {
+ idSpec = IdStreamSpec.makeDetectSpec(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
}
- BoltDeclarer input = builder.setBolt(id,
- new CoordinatedBolt(component.bolt,
- coordinatedArgs,
- idSpec),
- component.parallelism);
- for(Map conf: component.componentConfs) {
+ BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(component.bolt, coordinatedArgs, idSpec), component.parallelism);
+ for (Map conf : component.componentConfs) {
input.addConfigurations(conf);
}
- for(String c: componentBoltSubscriptions(component)) {
+ for (String c : componentBoltSubscriptions(component)) {
input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
}
- for(InputDeclaration d: component.declarations) {
+ for (InputDeclaration d : component.declarations) {
d.declare(input);
}
- if(component.committer) {
- input.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
+ if (component.committer) {
+ input.allGrouping(coordinator, TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
}
}
return builder;
}
-
+
public StormTopology buildTopology() {
return buildTopologyBuilder().createTopology();
}
-
+
private Set<String> componentBoltSubscriptions(Component component) {
Set<String> ret = new HashSet<String>();
- for(InputDeclaration d: component.declarations) {
+ for (InputDeclaration d : component.declarations) {
ret.add(d.getComponent());
}
return ret;
@@ -198,34 +190,35 @@ public class TransactionalTopologyBuilder {
public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
public List<Map> componentConfs = new ArrayList<Map>();
public boolean committer;
-
+
public Component(IRichBolt bolt, Integer parallelism, boolean committer) {
this.bolt = bolt;
this.parallelism = parallelism;
this.committer = committer;
}
}
-
+
private static interface InputDeclaration {
void declare(InputDeclarer declarer);
+
String getComponent();
}
-
+
private class SpoutDeclarerImpl extends BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
@Override
public SpoutDeclarer addConfigurations(Map conf) {
_spoutConfs.add(conf);
return this;
- }
+ }
}
-
+
private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
Component _component;
-
+
public BoltDeclarerImpl(Component component) {
_component = component;
}
-
+
@Override
public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
addDeclaration(new InputDeclaration() {
@@ -237,7 +230,7 @@ public class TransactionalTopologyBuilder {
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -248,12 +241,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.fieldsGrouping(component, streamId, fields);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -264,12 +257,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.globalGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -280,12 +273,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.globalGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -296,12 +289,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.shuffleGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -312,12 +305,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.shuffleGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -328,12 +321,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -345,7 +338,7 @@ public class TransactionalTopologyBuilder {
public void declare(InputDeclarer declarer) {
declarer.localOrShuffleGrouping(component, streamId);
}
-
+
@Override
public String getComponent() {
return component;
@@ -353,7 +346,7 @@ public class TransactionalTopologyBuilder {
});
return this;
}
-
+
@Override
public BoltDeclarer localFirstGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@@ -361,7 +354,7 @@ public class TransactionalTopologyBuilder {
public void declare(InputDeclarer declarer) {
declarer.localFirstGrouping(component);
}
-
+
@Override
public String getComponent() {
return component;
@@ -369,7 +362,7 @@ public class TransactionalTopologyBuilder {
});
return this;
}
-
+
@Override
public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
addDeclaration(new InputDeclaration() {
@@ -377,7 +370,7 @@ public class TransactionalTopologyBuilder {
public void declare(InputDeclarer declarer) {
declarer.localFirstGrouping(component, streamId);
}
-
+
@Override
public String getComponent() {
return component;
@@ -385,19 +378,19 @@ public class TransactionalTopologyBuilder {
});
return this;
}
-
+
@Override
public BoltDeclarer noneGrouping(final String component) {
addDeclaration(new InputDeclaration() {
@Override
public void declare(InputDeclarer declarer) {
declarer.noneGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -408,12 +401,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.noneGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -424,12 +417,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.allGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -440,12 +433,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.allGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -456,12 +449,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.directGrouping(component);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -472,12 +465,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.directGrouping(component, streamId);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -498,14 +491,14 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.customGrouping(component, grouping);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
- return this;
+ return this;
}
@Override
@@ -514,12 +507,12 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.customGrouping(component, streamId, grouping);
- }
+ }
@Override
public String getComponent() {
return component;
- }
+ }
});
return this;
}
@@ -530,16 +523,16 @@ public class TransactionalTopologyBuilder {
@Override
public void declare(InputDeclarer declarer) {
declarer.grouping(stream, grouping);
- }
+ }
@Override
public String getComponent() {
return stream.get_componentId();
- }
+ }
});
return this;
}
-
+
private void addDeclaration(InputDeclaration declaration) {
_component.declarations.add(declaration);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
index 8d1f60b..35fb1c6 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
@@ -24,33 +24,34 @@ import backtype.storm.transactional.TransactionAttempt;
import java.util.Map;
/**
- * This defines a transactional spout which does *not* necessarily
- * replay the same batch every time it emits a batch for a transaction id.
+ * This defines a transactional spout which does *not* necessarily replay the same batch every time it emits a batch for a transaction id.
*/
public interface IOpaquePartitionedTransactionalSpout<T> extends IComponent {
public interface Coordinator {
/**
* Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
*
- * You should sleep here if you want a delay between asking for the next transaction (this will be called
- * repeatedly in a loop).
+ * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
*/
boolean isReady();
+
void close();
}
-
+
public interface Emitter<X> {
/**
- * Emit a batch of tuples for a partition/transaction.
+ * Emit a batch of tuples for a partition/transaction.
*
- * Return the metadata describing this batch that will be used as lastPartitionMeta
- * for defining the parameters of the next batch.
+ * Return the metadata describing this batch that will be used as lastPartitionMeta for defining the parameters of the next batch.
*/
X emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta);
+
int numPartitions();
+
void close();
}
-
- Emitter<T> getEmitter(Map conf, TopologyContext context);
- Coordinator getCoordinator(Map conf, TopologyContext context);
+
+ Emitter<T> getEmitter(Map conf, TopologyContext context);
+
+ Coordinator getCoordinator(Map conf, TopologyContext context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
index e428328..7b1e4fb 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
@@ -24,46 +24,43 @@ import backtype.storm.coordination.BatchOutputCollector;
import java.util.Map;
/**
- * This interface defines a transactional spout that reads its tuples from a partitioned set of
- * brokers. It automates the storing of metadata for each partition to ensure that the same batch
- * is always emitted for the same transaction id. The partition metadata is stored in Zookeeper.
+ * This interface defines a transactional spout that reads its tuples from a partitioned set of brokers. It automates the storing of metadata for each partition
+ * to ensure that the same batch is always emitted for the same transaction id. The partition metadata is stored in Zookeeper.
*/
public interface IPartitionedTransactionalSpout<T> extends IComponent {
public interface Coordinator {
/**
- * Return the number of partitions currently in the source of data. The idea is
- * is that if a new partition is added and a prior transaction is replayed, it doesn't
- * emit tuples for the new partition because it knows how many partitions were in
- * that transaction.
+ * Return the number of partitions currently in the source of data. The idea is is that if a new partition is added and a prior transaction is replayed,
+ * it doesn't emit tuples for the new partition because it knows how many partitions were in that transaction.
*/
int numPartitions();
-
+
/**
* Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
*
- * You should sleep here if you want a delay between asking for the next transaction (this will be called
- * repeatedly in a loop).
+ * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
*/
boolean isReady();
-
+
void close();
}
-
+
public interface Emitter<X> {
/**
- * Emit a batch of tuples for a partition/transaction that's never been emitted before.
- * Return the metadata that can be used to reconstruct this partition/batch in the future.
+ * Emit a batch of tuples for a partition/transaction that's never been emitted before. Return the metadata that can be used to reconstruct this
+ * partition/batch in the future.
*/
X emitPartitionBatchNew(TransactionAttempt tx, BatchOutputCollector collector, int partition, X lastPartitionMeta);
/**
- * Emit a batch of tuples for a partition/transaction that has been emitted before, using
- * the metadata created when it was first emitted.
+ * Emit a batch of tuples for a partition/transaction that has been emitted before, using the metadata created when it was first emitted.
*/
void emitPartitionBatch(TransactionAttempt tx, BatchOutputCollector collector, int partition, X partitionMeta);
+
void close();
}
-
+
Coordinator getCoordinator(Map conf, TopologyContext context);
- Emitter<T> getEmitter(Map conf, TopologyContext context);
+
+ Emitter<T> getEmitter(Map conf, TopologyContext context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
index aabcb7a..4f894d9 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
@@ -33,17 +33,16 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
-
public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTransactionalSpout<Object> {
IOpaquePartitionedTransactionalSpout _spout;
-
+
public class Coordinator implements ITransactionalSpout.Coordinator<Object> {
IOpaquePartitionedTransactionalSpout.Coordinator _coordinator;
public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
}
-
+
@Override
public Object initializeTransaction(BigInteger txid, Object prevMetadata) {
return null;
@@ -52,14 +51,14 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
@Override
public boolean isReady() {
return _coordinator.isReady();
- }
+ }
@Override
public void close() {
_coordinator.close();
- }
+ }
}
-
+
public class Emitter implements ICommitterTransactionalSpout.Emitter {
IOpaquePartitionedTransactionalSpout.Emitter _emitter;
TransactionalState _state;
@@ -67,21 +66,21 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
int _index;
int _numTasks;
-
+
public Emitter(Map conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
- _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
+ _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
List<String> existingPartitions = _state.list("");
- for(String p: existingPartitions) {
+ for (String p : existingPartitions) {
int partition = Integer.parseInt(p);
- if((partition - _index) % _numTasks == 0) {
+ if ((partition - _index) % _numTasks == 0) {
_partitionStates.put(partition, new RotatingTransactionalState(_state, p));
}
}
}
-
+
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, BatchOutputCollector collector) {
Map<Integer, Object> metas = new HashMap<Integer, Object>();
@@ -89,21 +88,22 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
int partitions = _emitter.numPartitions();
Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
Map<Integer, Object> prevCached;
- if(entry!=null) {
+ if (entry != null) {
prevCached = entry.getValue();
} else {
prevCached = new HashMap<Integer, Object>();
}
-
- for(int i=_index; i < partitions; i+=_numTasks) {
+
+ for (int i = _index; i < partitions; i += _numTasks) {
RotatingTransactionalState state = _partitionStates.get(i);
- if(state==null) {
+ if (state == null) {
state = new RotatingTransactionalState(_state, "" + i);
_partitionStates.put(i, state);
}
state.removeState(tx.getTransactionId());
Object lastMeta = prevCached.get(i);
- if(lastMeta==null) lastMeta = state.getLastState();
+ if (lastMeta == null)
+ lastMeta = state.getLastState();
Object meta = _emitter.emitPartitionBatch(tx, collector, i, lastMeta);
metas.put(i, meta);
}
@@ -111,16 +111,16 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
@Override
public void cleanupBefore(BigInteger txid) {
- for(RotatingTransactionalState state: _partitionStates.values()) {
+ for (RotatingTransactionalState state : _partitionStates.values()) {
state.cleanupBefore(txid);
- }
+ }
}
@Override
public void commit(TransactionAttempt attempt) {
BigInteger txid = attempt.getTransactionId();
Map<Integer, Object> metas = _cachedMetas.remove(txid);
- for(Integer partition: metas.keySet()) {
+ for (Integer partition : metas.keySet()) {
Object meta = metas.get(partition);
_partitionStates.get(partition).overrideState(txid, meta);
}
@@ -130,12 +130,12 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
public void close() {
_emitter.close();
}
- }
-
+ }
+
public OpaquePartitionedTransactionalSpoutExecutor(IOpaquePartitionedTransactionalSpout spout) {
_spout = spout;
}
-
+
@Override
public ITransactionalSpout.Coordinator<Object> getCoordinator(Map conf, TopologyContext context) {
return new Coordinator(conf, context);
@@ -155,5 +155,5 @@ public class OpaquePartitionedTransactionalSpoutExecutor implements ICommitterTr
public Map<String, Object> getComponentConfiguration() {
return _spout.getComponentConfiguration();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
index 479dda4..8422576 100644
--- a/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
@@ -29,30 +29,29 @@ import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;
-
public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpout<Integer> {
IPartitionedTransactionalSpout _spout;
-
+
public PartitionedTransactionalSpoutExecutor(IPartitionedTransactionalSpout spout) {
_spout = spout;
}
-
+
public IPartitionedTransactionalSpout getPartitionedSpout() {
return _spout;
}
-
+
class Coordinator implements ITransactionalSpout.Coordinator<Integer> {
private IPartitionedTransactionalSpout.Coordinator _coordinator;
-
+
public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
}
-
+
@Override
public Integer initializeTransaction(BigInteger txid, Integer prevMetadata) {
return _coordinator.numPartitions();
}
-
+
@Override
public boolean isReady() {
return _coordinator.isReady();
@@ -61,53 +60,51 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
@Override
public void close() {
_coordinator.close();
- }
+ }
}
-
+
class Emitter implements ITransactionalSpout.Emitter<Integer> {
private IPartitionedTransactionalSpout.Emitter _emitter;
private TransactionalState _state;
private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
private int _index;
private int _numTasks;
-
+
public Emitter(Map conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
- _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
+ _state = TransactionalState.newUserState(conf, (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID), getComponentConfiguration());
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}
@Override
- public void emitBatch(final TransactionAttempt tx, final Integer partitions,
- final BatchOutputCollector collector) {
- for(int i=_index; i < partitions; i+=_numTasks) {
- if(!_partitionStates.containsKey(i)) {
+ public void emitBatch(final TransactionAttempt tx, final Integer partitions, final BatchOutputCollector collector) {
+ for (int i = _index; i < partitions; i += _numTasks) {
+ if (!_partitionStates.containsKey(i)) {
_partitionStates.put(i, new RotatingTransactionalState(_state, "" + i));
}
RotatingTransactionalState state = _partitionStates.get(i);
final int partition = i;
- Object meta = state.getStateOrCreate(tx.getTransactionId(),
- new RotatingTransactionalState.StateInitializer() {
+ Object meta = state.getStateOrCreate(tx.getTransactionId(), new RotatingTransactionalState.StateInitializer() {
@Override
public Object init(BigInteger txid, Object lastState) {
return _emitter.emitPartitionBatchNew(tx, collector, partition, lastState);
}
});
// it's null if one of:
- // a) a later transaction batch was emitted before this, so we should skip this batch
- // b) if didn't exist and was created (in which case the StateInitializer was invoked and
- // it was emitted
- if(meta!=null) {
+ // a) a later transaction batch was emitted before this, so we should skip this batch
+ // b) if didn't exist and was created (in which case the StateInitializer was invoked and
+ // it was emitted
+ if (meta != null) {
_emitter.emitPartitionBatch(tx, collector, partition, meta);
}
}
-
+
}
@Override
public void cleanupBefore(BigInteger txid) {
- for(RotatingTransactionalState state: _partitionStates.values()) {
+ for (RotatingTransactionalState state : _partitionStates.values()) {
state.cleanupBefore(txid);
}
}
@@ -117,7 +114,7 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
_state.close();
_emitter.close();
}
- }
+ }
@Override
public ITransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) {
@@ -138,5 +135,5 @@ public class PartitionedTransactionalSpoutExecutor implements ITransactionalSpou
public Map<String, Object> getComponentConfiguration() {
return _spout.getComponentConfiguration();
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
index 20c5cd3..63aced9 100644
--- a/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
@@ -27,19 +27,19 @@ import java.util.SortedMap;
import java.util.TreeMap;
/**
- * A map from txid to a value. Automatically deletes txids that have been committed.
+ * A map from txid to a value. Automatically deletes txids that have been committed.
*/
public class RotatingTransactionalState {
public static interface StateInitializer {
Object init(BigInteger txid, Object lastState);
- }
+ }
private TransactionalState _state;
private String _subdir;
private boolean _strictOrder;
-
+
private TreeMap<BigInteger, Object> _curr = new TreeMap<BigInteger, Object>();
-
+
public RotatingTransactionalState(TransactionalState state, String subdir, boolean strictOrder) {
_state = state;
_subdir = subdir;
@@ -51,32 +51,35 @@ public class RotatingTransactionalState {
public RotatingTransactionalState(TransactionalState state, String subdir) {
this(state, subdir, false);
}
-
+
public Object getLastState() {
- if(_curr.isEmpty()) return null;
- else return _curr.lastEntry().getValue();
+ if (_curr.isEmpty())
+ return null;
+ else
+ return _curr.lastEntry().getValue();
}
-
+
public void overrideState(BigInteger txid, Object state) {
_state.setData(txPath(txid), state);
_curr.put(txid, state);
}
public void removeState(BigInteger txid) {
- if(_curr.containsKey(txid)) {
+ if (_curr.containsKey(txid)) {
_curr.remove(txid);
_state.delete(txPath(txid));
}
}
-
+
public Object getState(BigInteger txid, StateInitializer init) {
- if(!_curr.containsKey(txid)) {
+ if (!_curr.containsKey(txid)) {
SortedMap<BigInteger, Object> prevMap = _curr.headMap(txid);
- SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid);
-
+ SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid);
+
BigInteger prev = null;
- if(!prevMap.isEmpty()) prev = prevMap.lastKey();
-
+ if (!prevMap.isEmpty())
+ prev = prevMap.lastKey();
+
if (_strictOrder) {
if (prev == null && !txid.equals(TransactionalSpoutCoordinator.INIT_TXID)) {
throw new IllegalStateException("Trying to initialize transaction for which there should be a previous state");
@@ -88,7 +91,7 @@ public class RotatingTransactionalState {
throw new IllegalStateException("Expecting tx state to be initialized in strict order but there are txids after that have state");
}
}
-
+
Object data;
if (afterMap.isEmpty()) {
Object prevData;
@@ -106,11 +109,11 @@ public class RotatingTransactionalState {
}
return _curr.get(txid);
}
-
+
public boolean hasCache(BigInteger txid) {
return _curr.containsKey(txid);
}
-
+
/**
* Returns null if it was created, the value otherwise.
*/
@@ -122,7 +125,7 @@ public class RotatingTransactionalState {
return null;
}
}
-
+
public void cleanupBefore(BigInteger txid) {
Set<BigInteger> toDelete = new HashSet<BigInteger>();
toDelete.addAll(_curr.headMap(txid).keySet());
@@ -131,21 +134,21 @@ public class RotatingTransactionalState {
_state.delete(txPath(tx));
}
}
-
+
private void sync() {
List<String> txids = _state.list(_subdir);
- for(String txid_s: txids) {
+ for (String txid_s : txids) {
Object data = _state.getData(txPath(txid_s));
_curr.put(new BigInteger(txid_s), data);
}
}
-
+
private String txPath(BigInteger tx) {
return txPath(tx.toString());
}
private String txPath(String tx) {
return _subdir + "/" + tx;
- }
-
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
index 3d4a463..02b3d0d 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/TestTransactionalState.java
@@ -32,16 +32,13 @@ import org.apache.zookeeper.data.ACL;
public class TestTransactionalState extends TransactionalState {
/**
- * Matching constructor in absence of a default constructor in the parent
- * class.
+ * Matching constructor in absence of a default constructor in the parent class.
*/
protected TestTransactionalState(Map conf, String id, Map componentConf, String subroot) {
super(conf, id, componentConf, subroot);
}
- public static void createNode(CuratorFramework curator,
- String rootDir, byte[] data, List<ACL> acls, CreateMode mode)
- throws Exception {
- TransactionalState.createNode(curator, rootDir, data, acls, mode);
+ public static void createNode(CuratorFramework curator, String rootDir, byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+ TransactionalState.createNode(curator, rootDir, data, acls, mode);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java b/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
index 5afcd0a..71d7cc3 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/state/TransactionalState.java
@@ -40,25 +40,23 @@ public class TransactionalState {
KryoValuesSerializer _ser;
KryoValuesDeserializer _des;
List<ACL> _zkAcls = null;
-
+
public static TransactionalState newUserState(Map conf, String id, Map componentConf) {
return new TransactionalState(conf, id, componentConf, "user");
}
-
+
public static TransactionalState newCoordinatorState(Map conf, String id, Map componentConf) {
- return new TransactionalState(conf, id, componentConf, "coordinator");
+ return new TransactionalState(conf, id, componentConf, "coordinator");
}
-
+
protected TransactionalState(Map conf, String id, Map componentConf, String subroot) {
try {
conf = new HashMap(conf);
// ensure that the serialization registrations are consistent with the declarations in this spout
- if(componentConf!=null) {
- conf.put(Config.TOPOLOGY_KRYO_REGISTER,
- componentConf
- .get(Config.TOPOLOGY_KRYO_REGISTER));
+ if (componentConf != null) {
+ conf.put(Config.TOPOLOGY_KRYO_REGISTER, componentConf.get(Config.TOPOLOGY_KRYO_REGISTER));
}
- String transactionalRoot = (String)conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
+ String transactionalRoot = (String) conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT);
String rootDir = transactionalRoot + "/" + id + "/" + subroot;
List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
@@ -74,29 +72,24 @@ public class TransactionalState {
} catch (KeeperException.NodeExistsException e) {
}
initter.close();
-
+
_curator = Utils.newCuratorStarted(conf, servers, port, rootDir, auth);
_ser = new KryoValuesSerializer(conf);
_des = new KryoValuesDeserializer(conf);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
}
}
- protected static String forPath(PathAndBytesable<String> builder,
- String path, byte[] data) throws Exception {
- return (data == null)
- ? builder.forPath(path)
- : builder.forPath(path, data);
+ protected static String forPath(PathAndBytesable<String> builder, String path, byte[] data) throws Exception {
+ return (data == null) ? builder.forPath(path) : builder.forPath(path, data);
}
- protected static void createNode(CuratorFramework curator, String path,
- byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
- ProtectACLCreateModePathAndBytesable<String> builder =
- curator.create().creatingParentsIfNeeded();
-
+ protected static void createNode(CuratorFramework curator, String path, byte[] data, List<ACL> acls, CreateMode mode) throws Exception {
+ ProtectACLCreateModePathAndBytesable<String> builder = curator.create().creatingParentsIfNeeded();
+
if (acls == null) {
- if (mode == null ) {
+ if (mode == null) {
TransactionalState.forPath(builder, path, data);
} else {
TransactionalState.forPath(builder.withMode(mode), path, data);
@@ -111,17 +104,16 @@ public class TransactionalState {
path = "/" + path;
byte[] ser = _ser.serializeObject(obj);
try {
- if(_curator.checkExists().forPath(path)!=null) {
+ if (_curator.checkExists().forPath(path) != null) {
_curator.setData().forPath(path, ser);
} else {
- TransactionalState.createNode(_curator, path, ser, _zkAcls,
- CreateMode.PERSISTENT);
+ TransactionalState.createNode(_curator, path, ser, _zkAcls, CreateMode.PERSISTENT);
}
- } catch(Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
- }
+ }
}
-
+
public void delete(String path) {
path = "/" + path;
try {
@@ -130,44 +122,45 @@ public class TransactionalState {
throw new RuntimeException(e);
}
}
-
+
public List<String> list(String path) {
path = "/" + path;
try {
- if(_curator.checkExists().forPath(path)==null) {
+ if (_curator.checkExists().forPath(path) == null) {
return new ArrayList<String>();
} else {
return _curator.getChildren().forPath(path);
}
- } catch(Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
- }
+ }
}
-
+
public void mkdir(String path) {
setData(path, 7);
}
-
+
public Object getData(String path) {
path = "/" + path;
try {
- if(_curator.checkExists().forPath(path)!=null) {
+ if (_curator.checkExists().forPath(path) != null) {
return _des.deserializeObject(_curator.getData().forPath(path));
} else {
return null;
}
- } catch(Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
-
+
public void close() {
_curator.close();
}
-
+
private Object getWithBackup(Map amap, Object primary, Object backup) {
Object ret = amap.get(primary);
- if(ret==null) return amap.get(backup);
+ if (ret == null)
+ return amap.get(backup);
return ret;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
index 47df545..eb3d0ce 100644
--- a/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/BatchTuple.java
@@ -20,11 +20,10 @@ package backtype.storm.tuple;
import java.util.ArrayList;
import java.util.List;
-
-public class BatchTuple {
+public class BatchTuple implements ITupleExt{
private int targetTaskId;
- private List<Tuple> batch;
+ private List<Tuple> batch = new ArrayList<Tuple>();
private int batchSize;
public BatchTuple() {
@@ -37,15 +36,12 @@ public class BatchTuple {
}
public void addToBatch(Tuple tuple) {
- if (batch == null) {
- batch = new ArrayList<Tuple>();
- }
batch.add(tuple);
}
public boolean isBatchFull() {
boolean ret = false;
- if (batch != null && batch.size() >= batchSize)
+ if (batch.size() >= batchSize)
ret = true;
return ret;
@@ -60,7 +56,7 @@ public class BatchTuple {
}
public int currBatchSize() {
- return batch == null ? 0 : batch.size();
+ return batch.size();
}
public void setTargetTaskId(int taskId) {
@@ -74,4 +70,16 @@ public class BatchTuple {
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
-}
+
+ @Deprecated
+ public long getCreationTimeStamp() {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ @Deprecated
+ public void setCreationTimeStamp(long timeStamp) {
+ // TODO Auto-generated method stub
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java b/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
index 9805ba6..6ba1e5c 100644
--- a/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/Fields.java
@@ -28,26 +28,24 @@ import java.io.Serializable;
public class Fields implements Iterable<String>, Serializable {
private List<String> _fields;
private Map<String, Integer> _index = new HashMap<String, Integer>();
-
+
public Fields(String... fields) {
this(Arrays.asList(fields));
}
-
+
public Fields(List<String> fields) {
_fields = new ArrayList<String>(fields.size());
for (String field : fields) {
if (_fields.contains(field))
- throw new IllegalArgumentException(
- String.format("duplicate field '%s'", field)
- );
+ throw new IllegalArgumentException(String.format("duplicate field '%s'", field));
_fields.add(field);
}
index();
}
-
+
public List<Object> select(Fields selector, List<Object> tuple) {
List<Object> ret = new ArrayList<Object>(selector.size());
- for(String s: selector) {
+ for (String s : selector) {
ret.add(tuple.get(_index.get(s)));
}
return ret;
@@ -56,7 +54,7 @@ public class Fields implements Iterable<String>, Serializable {
public List<String> toList() {
return new ArrayList<String>(_fields);
}
-
+
public int size() {
return _fields.size();
}
@@ -68,27 +66,27 @@ public class Fields implements Iterable<String>, Serializable {
public Iterator<String> iterator() {
return _fields.iterator();
}
-
+
/**
* Returns the position of the specified field.
*/
public int fieldIndex(String field) {
Integer ret = _index.get(field);
- if(ret==null) {
+ if (ret == null) {
throw new IllegalArgumentException(field + " does not exist");
}
return ret;
}
-
+
/**
* Returns true if this contains the specified name of the field.
*/
public boolean contains(String field) {
return _index.containsKey(field);
}
-
+
private void index() {
- for(int i=0; i<_fields.size(); i++) {
+ for (int i = 0; i < _fields.size(); i++) {
_index.put(_fields.get(i), i);
}
}
@@ -96,5 +94,5 @@ public class Fields implements Iterable<String>, Serializable {
@Override
public String toString() {
return _fields.toString();
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
index c85848d..21696b5 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/ITuple.java
@@ -52,60 +52,50 @@ public interface ITuple {
public Object getValue(int i);
/**
- * Returns the String at position i in the tuple. If that field is not a String,
- * you will get a runtime error.
+ * Returns the String at position i in the tuple. If that field is not a String, you will get a runtime error.
*/
public String getString(int i);
/**
- * Returns the Integer at position i in the tuple. If that field is not an Integer,
- * you will get a runtime error.
+ * Returns the Integer at position i in the tuple. If that field is not an Integer, you will get a runtime error.
*/
public Integer getInteger(int i);
/**
- * Returns the Long at position i in the tuple. If that field is not a Long,
- * you will get a runtime error.
+ * Returns the Long at position i in the tuple. If that field is not a Long, you will get a runtime error.
*/
public Long getLong(int i);
/**
- * Returns the Boolean at position i in the tuple. If that field is not a Boolean,
- * you will get a runtime error.
+ * Returns the Boolean at position i in the tuple. If that field is not a Boolean, you will get a runtime error.
*/
public Boolean getBoolean(int i);
/**
- * Returns the Short at position i in the tuple. If that field is not a Short,
- * you will get a runtime error.
+ * Returns the Short at position i in the tuple. If that field is not a Short, you will get a runtime error.
*/
public Short getShort(int i);
/**
- * Returns the Byte at position i in the tuple. If that field is not a Byte,
- * you will get a runtime error.
+ * Returns the Byte at position i in the tuple. If that field is not a Byte, you will get a runtime error.
*/
public Byte getByte(int i);
/**
- * Returns the Double at position i in the tuple. If that field is not a Double,
- * you will get a runtime error.
+ * Returns the Double at position i in the tuple. If that field is not a Double, you will get a runtime error.
*/
public Double getDouble(int i);
/**
- * Returns the Float at position i in the tuple. If that field is not a Float,
- * you will get a runtime error.
+ * Returns the Float at position i in the tuple. If that field is not a Float, you will get a runtime error.
*/
public Float getFloat(int i);
/**
- * Returns the byte array at position i in the tuple. If that field is not a byte array,
- * you will get a runtime error.
+ * Returns the byte array at position i in the tuple. If that field is not a byte array, you will get a runtime error.
*/
public byte[] getBinary(int i);
-
public Object getValueByField(String field);
public String getStringByField(String field);
@@ -131,6 +121,4 @@ public interface ITuple {
*/
public List<Object> getValues();
-
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java
new file mode 100644
index 0000000..92a7157
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/ITupleExt.java
@@ -0,0 +1,25 @@
+package backtype.storm.tuple;
+
+public interface ITupleExt {
+
+ /**
+ * Get Target TaskId
+ *
+ * @return
+ */
+ int getTargetTaskId();
+
+ void setTargetTaskId(int targetTaskId);
+
+ /**
+ * Get the timeStamp of creating tuple
+ *
+ * @return
+ */
+ long getCreationTimeStamp();
+
+ /*
+ * set ms
+ */
+ void setCreationTimeStamp(long timeStamp);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java b/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
index 688946d..329a4ae 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/MessageId.java
@@ -29,12 +29,12 @@ import java.util.Set;
public class MessageId {
private Map<Long, Long> _anchorsToIds;
-
+
@Deprecated
public static long generateId() {
return Utils.secureRandomLong();
}
-
+
public static long generateId(Random rand) {
return rand.nextLong();
}
@@ -42,17 +42,17 @@ public class MessageId {
public static MessageId makeUnanchored() {
return makeId(new HashMap<Long, Long>());
}
-
+
public static MessageId makeId(Map<Long, Long> anchorsToIds) {
return new MessageId(anchorsToIds);
}
-
+
public static MessageId makeRootId(long id, long val) {
Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
anchorsToIds.put(id, val);
return new MessageId(anchorsToIds);
}
-
+
protected MessageId(Map<Long, Long> anchorsToIds) {
_anchorsToIds = anchorsToIds;
}
@@ -63,8 +63,8 @@ public class MessageId {
public Set<Long> getAnchors() {
return _anchorsToIds.keySet();
- }
-
+ }
+
@Override
public int hashCode() {
return _anchorsToIds.hashCode();
@@ -72,7 +72,7 @@ public class MessageId {
@Override
public boolean equals(Object other) {
- if(other instanceof MessageId) {
+ if (other instanceof MessageId) {
return _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
} else {
return false;
@@ -86,7 +86,7 @@ public class MessageId {
public void serialize(Output out) throws IOException {
out.writeInt(_anchorsToIds.size(), true);
- for(Entry<Long, Long> anchorToId: _anchorsToIds.entrySet()) {
+ for (Entry<Long, Long> anchorToId : _anchorsToIds.entrySet()) {
out.writeLong(anchorToId.getKey());
out.writeLong(anchorToId.getValue());
}
@@ -95,7 +95,7 @@ public class MessageId {
public static MessageId deserialize(Input in) throws IOException {
int numAnchors = in.readInt(true);
Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
- for(int i=0; i<numAnchors; i++) {
+ for (int i = 0; i < numAnchors; i++) {
anchorsToIds.put(in.readLong(), in.readLong());
}
return new MessageId(anchorsToIds);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java b/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
index 34dc61a..95253df 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/Tuple.java
@@ -21,38 +21,35 @@ import backtype.storm.generated.GlobalStreamId;
import java.util.List;
/**
- * The tuple is the main data structure in Storm. A tuple is a named list of values,
- * where each value can be any type. Tuples are dynamically typed -- the types of the fields
- * do not need to be declared. Tuples have helper methods like getInteger and getString
- * to get field values without having to cast the result.
+ * The tuple is the main data structure in Storm. A tuple is a named list of values, where each value can be any type. Tuples are dynamically typed -- the types
+ * of the fields do not need to be declared. Tuples have helper methods like getInteger and getString to get field values without having to cast the result.
*
- * Storm needs to know how to serialize all the values in a tuple. By default, Storm
- * knows how to serialize the primitive types, strings, and byte arrays. If you want to
- * use another type, you'll need to implement and register a serializer for that type.
- * See {@link http://github.com/nathanmarz/storm/wiki/Serialization} for more info.
+ * Storm needs to know how to serialize all the values in a tuple. By default, Storm knows how to serialize the primitive types, strings, and byte arrays. If
+ * you want to use another type, you'll need to implement and register a serializer for that type. See {@link http
+ * ://github.com/nathanmarz/storm/wiki/Serialization} for more info.
*/
-public interface Tuple extends ITuple{
+public interface Tuple extends ITuple {
/**
* Returns the global stream id (component + stream) of this tuple.
*/
public GlobalStreamId getSourceGlobalStreamid();
-
+
/**
* Gets the id of the component that created this tuple.
*/
public String getSourceComponent();
-
+
/**
* Gets the id of the task that created this tuple.
*/
public int getSourceTask();
-
+
/**
* Gets the id of the stream that this tuple was emitted to.
*/
public String getSourceStreamId();
-
+
/**
* Gets the message id that associated with this tuple.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
index 60676c9..8f004cc 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleExt.java
@@ -17,13 +17,6 @@
*/
package backtype.storm.tuple;
-public interface TupleExt extends Tuple {
- /**
- * Get Target TaskId
- *
- * @return
- */
- int getTargetTaskId();
+public interface TupleExt extends Tuple, ITupleExt {
- void setTargetTaskId(int targetTaskId);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
index 818eff1..417774e 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImpl.java
@@ -41,31 +41,29 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
private GeneralTopologyContext context;
private MessageId id;
private IPersistentMap _meta = null;
-
+
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
this.values = values;
this.taskId = taskId;
this.streamId = streamId;
this.id = id;
this.context = context;
-
+
String componentId = context.getComponentId(taskId);
Fields schema = context.getComponentOutputFields(componentId, streamId);
- if(values.size()!=schema.size()) {
- throw new IllegalArgumentException(
- "Tuple created with wrong number of fields. " +
- "Expected " + schema.size() + " fields but got " +
- values.size() + " fields");
+ if (values.size() != schema.size()) {
+ throw new IllegalArgumentException("Tuple created with wrong number of fields. " + "Expected " + schema.size() + " fields but got " + values.size()
+ + " fields");
}
}
public TupleImpl(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
this(context, values, taskId, streamId, MessageId.makeUnanchored());
- }
-
+ }
+
Long _processSampleStartTime = null;
Long _executeSampleStartTime = null;
-
+
public void setProcessSampleStartTime(long ms) {
_processSampleStartTime = ms;
}
@@ -73,7 +71,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public Long getProcessSampleStartTime() {
return _processSampleStartTime;
}
-
+
public void setExecuteSampleStartTime(long ms) {
_executeSampleStartTime = ms;
}
@@ -81,13 +79,13 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public Long getExecuteSampleStartTime() {
return _executeSampleStartTime;
}
-
+
long _outAckVal = 0;
-
+
public void updateAckVal(long val) {
_outAckVal = _outAckVal ^ val;
}
-
+
public long getAckVal() {
return _outAckVal;
}
@@ -95,15 +93,15 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public int size() {
return values.size();
}
-
+
public int fieldIndex(String field) {
return getFields().fieldIndex(field);
}
-
+
public boolean contains(String field) {
return getFields().contains(field);
}
-
+
public Object getValue(int i) {
return values.get(i);
}
@@ -143,8 +141,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public byte[] getBinary(int i) {
return (byte[]) values.get(i);
}
-
-
+
public Object getValueByField(String field) {
return values.get(fieldIndex(field));
}
@@ -184,11 +181,11 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public byte[] getBinaryByField(String field) {
return (byte[]) values.get(fieldIndex(field));
}
-
+
public List<Object> getValues() {
return values;
}
-
+
public Fields getFields() {
return context.getComponentOutputFields(getSourceComponent(), getSourceStreamId());
}
@@ -196,37 +193,37 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public List<Object> select(Fields selector) {
return getFields().select(selector, values);
}
-
+
public GlobalStreamId getSourceGlobalStreamid() {
return new GlobalStreamId(getSourceComponent(), streamId);
}
-
+
public String getSourceComponent() {
return context.getComponentId(taskId);
}
-
+
public int getSourceTask() {
return taskId;
}
-
+
public String getSourceStreamId() {
return streamId;
}
-
+
public MessageId getMessageId() {
return id;
}
-
+
@Override
public String toString() {
- return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: "+ id.toString() + ", " + values.toString();
+ return "source: " + getSourceComponent() + ":" + taskId + ", stream: " + streamId + ", id: " + id.toString() + ", " + values.toString();
}
-
+
@Override
public boolean equals(Object other) {
return this == other;
- }
-
+ }
+
@Override
public int hashCode() {
return System.identityHashCode(this);
@@ -234,25 +231,25 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
private final Keyword makeKeyword(String name) {
return Keyword.intern(Symbol.create(name));
- }
+ }
/* ILookup */
@Override
public Object valAt(Object o) {
try {
- if(o instanceof Keyword) {
+ if (o instanceof Keyword) {
return getValueByField(((Keyword) o).getName());
- } else if(o instanceof String) {
+ } else if (o instanceof String) {
return getValueByField((String) o);
}
- } catch(IllegalArgumentException e) {
+ } catch (IllegalArgumentException e) {
}
return null;
}
/* Seqable */
public ISeq seq() {
- if(values.size() > 0) {
+ if (values.size() > 0) {
return new Seq(getFields().toList(), values, 0);
}
return null;
@@ -272,7 +269,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public Seq(IPersistentMap meta, List<String> fields, List<Object> values, int i) {
super(meta);
- this.fields= fields;
+ this.fields = fields;
this.values = values;
assert i >= 0;
this.i = i;
@@ -283,16 +280,16 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
}
public ISeq next() {
- if(i+1 < fields.size()) {
- return new Seq(fields, values, i+1);
+ if (i + 1 < fields.size()) {
+ return new Seq(fields, values, i + 1);
}
return null;
}
public int count() {
- assert fields.size() -i >= 0 : "index out of bounds";
+ assert fields.size() - i >= 0 : "index out of bounds";
// i being the position in the fields of this seq, the remainder of the seq is the size
- return fields.size() -i;
+ return fields.size() - i;
}
public Obj withMeta(IPersistentMap meta) {
@@ -302,7 +299,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
/* Indexed */
public Object nth(int i) {
- if(i < values.size()) {
+ if (i < values.size()) {
return values.get(i);
} else {
return null;
@@ -311,7 +308,8 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public Object nth(int i, Object notfound) {
Object ret = nth(i);
- if(ret==null) ret = notfound;
+ if (ret == null)
+ ret = notfound;
return ret;
}
@@ -319,33 +317,32 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public int count() {
return values.size();
}
-
+
/* IMeta */
public IPersistentMap meta() {
- if(_meta==null) {
- _meta = new PersistentArrayMap( new Object[] {
- makeKeyword("stream"), getSourceStreamId(),
- makeKeyword("component"), getSourceComponent(),
- makeKeyword("task"), getSourceTask()});
+ if (_meta == null) {
+ _meta =
+ new PersistentArrayMap(new Object[] { makeKeyword("stream"), getSourceStreamId(), makeKeyword("component"), getSourceComponent(),
+ makeKeyword("task"), getSourceTask() });
}
return _meta;
}
private PersistentArrayMap toMap() {
- Object array[] = new Object[values.size()*2];
+ Object array[] = new Object[values.size() * 2];
List<String> fields = getFields().toList();
- for(int i=0; i < values.size(); i++) {
- array[i*2] = fields.get(i);
- array[(i*2)+1] = values.get(i);
+ for (int i = 0; i < values.size(); i++) {
+ array[i * 2] = fields.get(i);
+ array[(i * 2) + 1] = values.get(i);
}
return new PersistentArrayMap(array);
}
public IPersistentMap getMap() {
- if(_map==null) {
+ if (_map == null) {
setMap(toMap());
}
return _map;
- }
-
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
index 2e966a0..4017769 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/TupleImplExt.java
@@ -22,25 +22,36 @@ import java.util.List;
import backtype.storm.task.GeneralTopologyContext;
public class TupleImplExt extends TupleImpl implements TupleExt {
-
+
protected int targetTaskId;
-
+ protected long creationTimeStamp = System.currentTimeMillis();
+
public TupleImplExt(GeneralTopologyContext context, List<Object> values, int taskId, String streamId) {
super(context, values, taskId, streamId);
}
-
+
public TupleImplExt(GeneralTopologyContext context, List<Object> values, int taskId, String streamId, MessageId id) {
super(context, values, taskId, streamId, id);
}
-
+
@Override
public int getTargetTaskId() {
return targetTaskId;
}
-
+
@Override
public void setTargetTaskId(int targetTaskId) {
this.targetTaskId = targetTaskId;
}
-
+
+ @Override
+ public long getCreationTimeStamp() {
+ return creationTimeStamp;
+ }
+
+ @Override
+ public void setCreationTimeStamp(long timeStamp) {
+ this.creationTimeStamp = timeStamp;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/tuple/Values.java b/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
index 41bbc71..c25363b 100755
--- a/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
+++ b/jstorm-core/src/main/java/backtype/storm/tuple/Values.java
@@ -20,17 +20,16 @@ package backtype.storm.tuple;
import java.util.ArrayList;
/**
- * A convenience class for making tuple values using new Values("field1", 2, 3)
- * syntax.
+ * A convenience class for making tuple values using new Values("field1", 2, 3) syntax.
*/
-public class Values extends ArrayList<Object>{
+public class Values extends ArrayList<Object> {
public Values() {
-
+
}
-
+
public Values(Object... vals) {
super(vals.length);
- for(Object o: vals) {
+ for (Object o : vals) {
add(o);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java b/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
index 1311d6d..d9fa692 100755
--- a/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
+++ b/jstorm-core/src/main/java/backtype/storm/utils/BufferFileInputStream.java
@@ -22,7 +22,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
-
public class BufferFileInputStream {
byte[] buffer;
FileInputStream stream;
@@ -33,15 +32,15 @@ public class BufferFileInputStream {
}
public BufferFileInputStream(String file) throws FileNotFoundException {
- this(file, 15*1024);
+ this(file, 15 * 1024);
}
public byte[] read() throws IOException {
int length = stream.read(buffer);
- if(length==-1) {
+ if (length == -1) {
close();
return new byte[0];
- } else if(length==buffer.length) {
+ } else if (length == buffer.length) {
return buffer;
} else {
return Arrays.copyOf(buffer, length);