You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:03 UTC
[24/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
deleted file mode 100644
index b4e437b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
+++ /dev/null
@@ -1,566 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.coordination.IBatchBolt;
-import backtype.storm.coordination.BatchBoltExecutor;
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.coordination.CoordinatedBolt;
-import backtype.storm.coordination.CoordinatedBolt.IdStreamSpec;
-import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor;
-import backtype.storm.transactional.partitioned.PartitionedTransactionalSpoutExecutor;
-import backtype.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Trident subsumes the functionality provided by transactional topologies, so
- * this class is deprecated.
- *
- */
-@Deprecated
-public class TransactionalTopologyBuilder {
- String _id;
- String _spoutId;
- ITransactionalSpout _spout;
- Map<String, Component> _bolts = new HashMap<String, Component>();
- Integer _spoutParallelism;
- List<Map> _spoutConfs = new ArrayList();
-
- // id is used to store the state of this transactionalspout in zookeeper
- // it would be very dangerous to have 2 topologies active with the same id
- // in the same cluster
- public TransactionalTopologyBuilder(String id, String spoutId,
- ITransactionalSpout spout, Number spoutParallelism) {
- _id = id;
- _spoutId = spoutId;
- _spout = spout;
- _spoutParallelism = (spoutParallelism == null) ? null
- : spoutParallelism.intValue();
- }
-
- public TransactionalTopologyBuilder(String id, String spoutId,
- ITransactionalSpout spout) {
- this(id, spoutId, spout, null);
- }
-
- public TransactionalTopologyBuilder(String id, String spoutId,
- IPartitionedTransactionalSpout spout, Number spoutParallelism) {
- this(id, spoutId, new PartitionedTransactionalSpoutExecutor(spout),
- spoutParallelism);
- }
-
- public TransactionalTopologyBuilder(String id, String spoutId,
- IPartitionedTransactionalSpout spout) {
- this(id, spoutId, spout, null);
- }
-
- public TransactionalTopologyBuilder(String id, String spoutId,
- IOpaquePartitionedTransactionalSpout spout, Number spoutParallelism) {
- this(id, spoutId,
- new OpaquePartitionedTransactionalSpoutExecutor(spout),
- spoutParallelism);
- }
-
- public TransactionalTopologyBuilder(String id, String spoutId,
- IOpaquePartitionedTransactionalSpout spout) {
- this(id, spoutId, spout, null);
- }
-
- public SpoutDeclarer getSpoutDeclarer() {
- return new SpoutDeclarerImpl();
- }
-
- public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
- return setBolt(id, new BatchBoltExecutor(bolt), parallelism,
- bolt instanceof ICommitter);
- }
-
- public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt) {
- return setCommitterBolt(id, bolt, null);
- }
-
- public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt,
- Number parallelism) {
- return setBolt(id, new BatchBoltExecutor(bolt), parallelism, true);
- }
-
- public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
- return setBolt(id, new BasicBoltExecutor(bolt), parallelism, false);
- }
-
- private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism,
- boolean committer) {
- Integer p = null;
- if (parallelism != null)
- p = parallelism.intValue();
- Component component = new Component(bolt, p, committer);
- _bolts.put(id, component);
- return new BoltDeclarerImpl(component);
- }
-
- public TopologyBuilder buildTopologyBuilder() {
- String coordinator = _spoutId + "/coordinator";
- TopologyBuilder builder = new TopologyBuilder();
- SpoutDeclarer declarer = builder.setSpout(coordinator,
- new TransactionalSpoutCoordinator(_spout));
- for (Map conf : _spoutConfs) {
- declarer.addConfigurations(conf);
- }
- declarer.addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
-
- BoltDeclarer emitterDeclarer = builder
- .setBolt(
- _spoutId,
- new CoordinatedBolt(
- new TransactionalSpoutBatchExecutor(_spout),
- null, null), _spoutParallelism)
- .allGrouping(
- coordinator,
- TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID)
- .addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
- if (_spout instanceof ICommitterTransactionalSpout) {
- emitterDeclarer.allGrouping(coordinator,
- TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
- }
- for (String id : _bolts.keySet()) {
- Component component = _bolts.get(id);
- Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
- // get all source component
- for (String c : componentBoltSubscriptions(component)) {
- coordinatedArgs.put(c, SourceArgs.all());
- }
-
- IdStreamSpec idSpec = null;
- if (component.committer) {
- idSpec = IdStreamSpec
- .makeDetectSpec(
- coordinator,
- TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
- }
- BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(
- component.bolt, coordinatedArgs, idSpec),
- component.parallelism);
- for (Map conf : component.componentConfs) {
- input.addConfigurations(conf);
- }
- for (String c : componentBoltSubscriptions(component)) {
- input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
- }
- for (InputDeclaration d : component.declarations) {
- d.declare(input);
- }
- if (component.committer) {
- input.allGrouping(
- coordinator,
- TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
- }
- }
- return builder;
- }
-
- public StormTopology buildTopology() {
- return buildTopologyBuilder().createTopology();
- }
-
- private Set<String> componentBoltSubscriptions(Component component) {
- Set<String> ret = new HashSet<String>();
- for (InputDeclaration d : component.declarations) {
- ret.add(d.getComponent());
- }
- return ret;
- }
-
- private static class Component {
- public IRichBolt bolt;
- public Integer parallelism;
- public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
- public List<Map> componentConfs = new ArrayList<Map>();
- public boolean committer;
-
- public Component(IRichBolt bolt, Integer parallelism, boolean committer) {
- this.bolt = bolt;
- this.parallelism = parallelism;
- this.committer = committer;
- }
- }
-
- private static interface InputDeclaration {
- void declare(InputDeclarer declarer);
-
- String getComponent();
- }
-
- private class SpoutDeclarerImpl extends
- BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
- @Override
- public SpoutDeclarer addConfigurations(Map conf) {
- _spoutConfs.add(conf);
- return this;
- }
- }
-
- private class BoltDeclarerImpl extends
- BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
- Component _component;
-
- public BoltDeclarerImpl(Component component) {
- _component = component;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component,
- final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component,
- final String streamId, final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, streamId, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localFirstGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localFirstGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component,
- final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component,
- final String streamId, final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, streamId, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer grouping(final GlobalStreamId stream,
- final Grouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.grouping(stream, grouping);
- }
-
- @Override
- public String getComponent() {
- return stream.get_componentId();
- }
- });
- return this;
- }
-
- private void addDeclaration(InputDeclaration declaration) {
- _component.declarations.add(declaration);
- }
-
- @Override
- public BoltDeclarer addConfigurations(Map conf) {
- _component.componentConfs.add(conf);
- return this;
- }
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
deleted file mode 100644
index 65c0772..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.transactional.TransactionAttempt;
-import java.util.Map;
-
-/**
- * This defines a transactional spout which does *not* necessarily replay the
- * same batch every time it emits a batch for a transaction id.
- */
-public interface IOpaquePartitionedTransactionalSpout<T> extends IComponent {
- public interface Coordinator {
- /**
- * Returns true if its ok to emit start a new transaction, false
- * otherwise (will skip this transaction).
- *
- * You should sleep here if you want a delay between asking for the next
- * transaction (this will be called repeatedly in a loop).
- */
- boolean isReady();
-
- void close();
- }
-
- public interface Emitter<X> {
- /**
- * Emit a batch of tuples for a partition/transaction.
- *
- * Return the metadata describing this batch that will be used as
- * lastPartitionMeta for defining the parameters of the next batch.
- */
- X emitPartitionBatch(TransactionAttempt tx,
- BatchOutputCollector collector, int partition,
- X lastPartitionMeta);
-
- int numPartitions();
-
- void close();
- }
-
- Emitter<T> getEmitter(Map conf, TopologyContext context);
-
- Coordinator getCoordinator(Map conf, TopologyContext context);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
deleted file mode 100644
index 31e4c41..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import java.util.Map;
-
-/**
- * This interface defines a transactional spout that reads its tuples from a
- * partitioned set of brokers. It automates the storing of metadata for each
- * partition to ensure that the same batch is always emitted for the same
- * transaction id. The partition metadata is stored in Zookeeper.
- */
-public interface IPartitionedTransactionalSpout<T> extends IComponent {
- public interface Coordinator {
- /**
- * Return the number of partitions currently in the source of data. The
- * idea is is that if a new partition is added and a prior transaction
- * is replayed, it doesn't emit tuples for the new partition because it
- * knows how many partitions were in that transaction.
- */
- int numPartitions();
-
- /**
- * Returns true if its ok to emit start a new transaction, false
- * otherwise (will skip this transaction).
- *
- * You should sleep here if you want a delay between asking for the next
- * transaction (this will be called repeatedly in a loop).
- */
- boolean isReady();
-
- void close();
- }
-
- public interface Emitter<X> {
- /**
- * Emit a batch of tuples for a partition/transaction that's never been
- * emitted before. Return the metadata that can be used to reconstruct
- * this partition/batch in the future.
- */
- X emitPartitionBatchNew(TransactionAttempt tx,
- BatchOutputCollector collector, int partition,
- X lastPartitionMeta);
-
- /**
- * Emit a batch of tuples for a partition/transaction that has been
- * emitted before, using the metadata created when it was first emitted.
- */
- void emitPartitionBatch(TransactionAttempt tx,
- BatchOutputCollector collector, int partition, X partitionMeta);
-
- void close();
- }
-
- Coordinator getCoordinator(Map conf, TopologyContext context);
-
- Emitter<T> getEmitter(Map conf, TopologyContext context);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
deleted file mode 100644
index 4bc877f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.Config;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.ICommitterTransactionalSpout;
-import backtype.storm.transactional.ITransactionalSpout;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.transactional.state.RotatingTransactionalState;
-import backtype.storm.transactional.state.TransactionalState;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-public class OpaquePartitionedTransactionalSpoutExecutor implements
- ICommitterTransactionalSpout<Object> {
- IOpaquePartitionedTransactionalSpout _spout;
-
- public class Coordinator implements ITransactionalSpout.Coordinator<Object> {
- IOpaquePartitionedTransactionalSpout.Coordinator _coordinator;
-
- public Coordinator(Map conf, TopologyContext context) {
- _coordinator = _spout.getCoordinator(conf, context);
- }
-
- @Override
- public Object initializeTransaction(BigInteger txid, Object prevMetadata) {
- return null;
- }
-
- @Override
- public boolean isReady() {
- return _coordinator.isReady();
- }
-
- @Override
- public void close() {
- _coordinator.close();
- }
- }
-
- public class Emitter implements ICommitterTransactionalSpout.Emitter {
- IOpaquePartitionedTransactionalSpout.Emitter _emitter;
- TransactionalState _state;
- TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap<BigInteger, Map<Integer, Object>>();
- Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
- int _index;
- int _numTasks;
-
- public Emitter(Map conf, TopologyContext context) {
- _emitter = _spout.getEmitter(conf, context);
- _index = context.getThisTaskIndex();
- _numTasks = context.getComponentTasks(context.getThisComponentId())
- .size();
- _state = TransactionalState.newUserState(conf,
- (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID),
- getComponentConfiguration());
- List<String> existingPartitions = _state.list("");
- for (String p : existingPartitions) {
- int partition = Integer.parseInt(p);
- if ((partition - _index) % _numTasks == 0) {
- _partitionStates.put(partition,
- new RotatingTransactionalState(_state, p));
- }
- }
- }
-
- @Override
- public void emitBatch(TransactionAttempt tx, Object coordinatorMeta,
- BatchOutputCollector collector) {
- Map<Integer, Object> metas = new HashMap<Integer, Object>();
- _cachedMetas.put(tx.getTransactionId(), metas);
- int partitions = _emitter.numPartitions();
- Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas
- .lowerEntry(tx.getTransactionId());
- Map<Integer, Object> prevCached;
- if (entry != null) {
- prevCached = entry.getValue();
- } else {
- prevCached = new HashMap<Integer, Object>();
- }
-
- for (int i = _index; i < partitions; i += _numTasks) {
- RotatingTransactionalState state = _partitionStates.get(i);
- if (state == null) {
- state = new RotatingTransactionalState(_state, "" + i);
- _partitionStates.put(i, state);
- }
- state.removeState(tx.getTransactionId());
- Object lastMeta = prevCached.get(i);
- if (lastMeta == null)
- lastMeta = state.getLastState();
- Object meta = _emitter.emitPartitionBatch(tx, collector, i,
- lastMeta);
- metas.put(i, meta);
- }
- }
-
- @Override
- public void cleanupBefore(BigInteger txid) {
- for (RotatingTransactionalState state : _partitionStates.values()) {
- state.cleanupBefore(txid);
- }
- }
-
- @Override
- public void commit(TransactionAttempt attempt) {
- BigInteger txid = attempt.getTransactionId();
- Map<Integer, Object> metas = _cachedMetas.remove(txid);
- for (Integer partition : metas.keySet()) {
- Object meta = metas.get(partition);
- _partitionStates.get(partition).overrideState(txid, meta);
- }
- }
-
- @Override
- public void close() {
- _emitter.close();
- }
- }
-
- public OpaquePartitionedTransactionalSpoutExecutor(
- IOpaquePartitionedTransactionalSpout spout) {
- _spout = spout;
- }
-
- @Override
- public ITransactionalSpout.Coordinator<Object> getCoordinator(Map conf,
- TopologyContext context) {
- return new Coordinator(conf, context);
- }
-
- @Override
- public ICommitterTransactionalSpout.Emitter getEmitter(Map conf,
- TopologyContext context) {
- return new Emitter(conf, context);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _spout.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return _spout.getComponentConfiguration();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
deleted file mode 100644
index 51bb34e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.ITransactionalSpout;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.transactional.state.RotatingTransactionalState;
-import backtype.storm.transactional.state.TransactionalState;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
-public class PartitionedTransactionalSpoutExecutor implements
- ITransactionalSpout<Integer> {
- IPartitionedTransactionalSpout _spout;
-
- public PartitionedTransactionalSpoutExecutor(
- IPartitionedTransactionalSpout spout) {
- _spout = spout;
- }
-
- public IPartitionedTransactionalSpout getPartitionedSpout() {
- return _spout;
- }
-
- class Coordinator implements ITransactionalSpout.Coordinator<Integer> {
- private IPartitionedTransactionalSpout.Coordinator _coordinator;
-
- public Coordinator(Map conf, TopologyContext context) {
- _coordinator = _spout.getCoordinator(conf, context);
- }
-
- @Override
- public Integer initializeTransaction(BigInteger txid,
- Integer prevMetadata) {
- return _coordinator.numPartitions();
- }
-
- @Override
- public boolean isReady() {
- return _coordinator.isReady();
- }
-
- @Override
- public void close() {
- _coordinator.close();
- }
- }
-
- class Emitter implements ITransactionalSpout.Emitter<Integer> {
- private IPartitionedTransactionalSpout.Emitter _emitter;
- private TransactionalState _state;
- private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
- private int _index;
- private int _numTasks;
-
- public Emitter(Map conf, TopologyContext context) {
- _emitter = _spout.getEmitter(conf, context);
- _state = TransactionalState.newUserState(conf,
- (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID),
- getComponentConfiguration());
- _index = context.getThisTaskIndex();
- _numTasks = context.getComponentTasks(context.getThisComponentId())
- .size();
- }
-
- @Override
- public void emitBatch(final TransactionAttempt tx,
- final Integer partitions, final BatchOutputCollector collector) {
- for (int i = _index; i < partitions; i += _numTasks) {
- if (!_partitionStates.containsKey(i)) {
- _partitionStates.put(i, new RotatingTransactionalState(
- _state, "" + i));
- }
- RotatingTransactionalState state = _partitionStates.get(i);
- final int partition = i;
- Object meta = state.getStateOrCreate(tx.getTransactionId(),
- new RotatingTransactionalState.StateInitializer() {
- @Override
- public Object init(BigInteger txid, Object lastState) {
- return _emitter.emitPartitionBatchNew(tx,
- collector, partition, lastState);
- }
- });
- // it's null if one of:
- // a) a later transaction batch was emitted before this, so we
- // should skip this batch
- // b) if didn't exist and was created (in which case the
- // StateInitializer was invoked and
- // it was emitted
- if (meta != null) {
- _emitter.emitPartitionBatch(tx, collector, partition, meta);
- }
- }
-
- }
-
- @Override
- public void cleanupBefore(BigInteger txid) {
- for (RotatingTransactionalState state : _partitionStates.values()) {
- state.cleanupBefore(txid);
- }
- }
-
- @Override
- public void close() {
- _state.close();
- _emitter.close();
- }
- }
-
- @Override
- public ITransactionalSpout.Coordinator getCoordinator(Map conf,
- TopologyContext context) {
- return new Coordinator(conf, context);
- }
-
- @Override
- public ITransactionalSpout.Emitter getEmitter(Map conf,
- TopologyContext context) {
- return new Emitter(conf, context);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _spout.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return _spout.getComponentConfiguration();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java b/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
deleted file mode 100644
index 2ee9f85..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package backtype.storm.transactional.state;
-
-import backtype.storm.transactional.TransactionalSpoutCoordinator;
-
-import java.math.BigInteger;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-/**
- * A map from txid to a value. Automatically deletes txids that have been
- * committed.
- */
-public class RotatingTransactionalState {
- public static interface StateInitializer {
- Object init(BigInteger txid, Object lastState);
- }
-
- private TransactionalState _state;
- private String _subdir;
- private boolean _strictOrder;
-
- private TreeMap<BigInteger, Object> _curr = new TreeMap<BigInteger, Object>();
-
- public RotatingTransactionalState(TransactionalState state, String subdir,
- boolean strictOrder) {
- _state = state;
- _subdir = subdir;
- _strictOrder = strictOrder;
- state.mkdir(subdir);
- sync();
- }
-
- public RotatingTransactionalState(TransactionalState state, String subdir) {
- this(state, subdir, false);
- }
-
- public Object getLastState() {
- if (_curr.isEmpty())
- return null;
- else
- return _curr.lastEntry().getValue();
- }
-
- public void overrideState(BigInteger txid, Object state) {
- _state.setData(txPath(txid), state);
- _curr.put(txid, state);
- }
-
- public void removeState(BigInteger txid) {
- if (_curr.containsKey(txid)) {
- _curr.remove(txid);
- _state.delete(txPath(txid));
- }
- }
-
- public Object getState(BigInteger txid, StateInitializer init) {
- if (!_curr.containsKey(txid)) {
- SortedMap<BigInteger, Object> prevMap = _curr.headMap(txid);
- SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid);
-
- BigInteger prev = null;
- if (!prevMap.isEmpty())
- prev = prevMap.lastKey();
-
- if (_strictOrder) {
- if (prev == null
- && !txid.equals(TransactionalSpoutCoordinator.INIT_TXID)) {
- throw new IllegalStateException(
- "Trying to initialize transaction for which there should be a previous state");
- }
- if (prev != null && !prev.equals(txid.subtract(BigInteger.ONE))) {
- throw new IllegalStateException(
- "Expecting previous txid state to be the previous transaction");
- }
- if (!afterMap.isEmpty()) {
- throw new IllegalStateException(
- "Expecting tx state to be initialized in strict order but there are txids after that have state");
- }
- }
-
- Object data;
- if (afterMap.isEmpty()) {
- Object prevData;
- if (prev != null) {
- prevData = _curr.get(prev);
- } else {
- prevData = null;
- }
- data = init.init(txid, prevData);
- } else {
- data = null;
- }
- _curr.put(txid, data);
- _state.setData(txPath(txid), data);
- }
- return _curr.get(txid);
- }
-
- public boolean hasCache(BigInteger txid) {
- return _curr.containsKey(txid);
- }
-
- /**
- * Returns null if it was created, the value otherwise.
- */
- public Object getStateOrCreate(BigInteger txid, StateInitializer init) {
- if (_curr.containsKey(txid)) {
- return _curr.get(txid);
- } else {
- getState(txid, init);
- return null;
- }
- }
-
- public void cleanupBefore(BigInteger txid) {
- Set<BigInteger> toDelete = new HashSet<BigInteger>();
- toDelete.addAll(_curr.headMap(txid).keySet());
- for (BigInteger tx : toDelete) {
- _curr.remove(tx);
- _state.delete(txPath(tx));
- }
- }
-
- private void sync() {
- List<String> txids = _state.list(_subdir);
- for (String txid_s : txids) {
- Object data = _state.getData(txPath(txid_s));
- _curr.put(new BigInteger(txid_s), data);
- }
- }
-
- private String txPath(BigInteger tx) {
- return txPath(tx.toString());
- }
-
- private String txPath(String tx) {
- return _subdir + "/" + tx;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java b/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java
deleted file mode 100644
index 11b8359..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package backtype.storm.transactional.state;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-
-import backtype.storm.Config;
-import backtype.storm.serialization.KryoValuesDeserializer;
-import backtype.storm.serialization.KryoValuesSerializer;
-import backtype.storm.utils.Utils;
-
-public class TransactionalState {
- CuratorFramework _curator;
- KryoValuesSerializer _ser;
- KryoValuesDeserializer _des;
-
- public static TransactionalState newUserState(Map conf, String id,
- Map componentConf) {
- return new TransactionalState(conf, id, componentConf, "user");
- }
-
- public static TransactionalState newCoordinatorState(Map conf, String id,
- Map componentConf) {
- return new TransactionalState(conf, id, componentConf, "coordinator");
- }
-
- protected TransactionalState(Map conf, String id, Map componentConf,
- String subroot) {
- try {
- conf = new HashMap(conf);
- // ensure that the serialization registrations are consistent with
- // the declarations in this spout
- if (componentConf != null) {
- conf.put(Config.TOPOLOGY_KRYO_REGISTER,
- componentConf.get(Config.TOPOLOGY_KRYO_REGISTER));
- }
- String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT)
- + "/" + id + "/" + subroot;
- List<String> servers = (List<String>) getWithBackup(conf,
- Config.TRANSACTIONAL_ZOOKEEPER_SERVERS,
- Config.STORM_ZOOKEEPER_SERVERS);
- Object port = getWithBackup(conf,
- Config.TRANSACTIONAL_ZOOKEEPER_PORT,
- Config.STORM_ZOOKEEPER_PORT);
- CuratorFramework initter = Utils.newCuratorStarted(conf, servers,
- port);
- try {
- initter.create().creatingParentsIfNeeded().forPath(rootDir);
- } catch (KeeperException.NodeExistsException e) {
-
- }
-
- initter.close();
-
- _curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
- _ser = new KryoValuesSerializer(conf);
- _des = new KryoValuesDeserializer(conf);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void setData(String path, Object obj) {
- path = "/" + path;
- byte[] ser = _ser.serializeObject(obj);
- try {
- if (_curator.checkExists().forPath(path) != null) {
- _curator.setData().forPath(path, ser);
- } else {
- _curator.create().creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT).forPath(path, ser);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void delete(String path) {
- path = "/" + path;
- try {
- _curator.delete().forPath(path);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public List<String> list(String path) {
- path = "/" + path;
- try {
- if (_curator.checkExists().forPath(path) == null) {
- return new ArrayList<String>();
- } else {
- return _curator.getChildren().forPath(path);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void mkdir(String path) {
- setData(path, 7);
- }
-
- public Object getData(String path) {
- path = "/" + path;
- try {
- if (_curator.checkExists().forPath(path) != null) {
- return _des.deserializeObject(_curator.getData().forPath(path));
- } else {
- return null;
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() {
- _curator.close();
- }
-
- private Object getWithBackup(Map amap, Object primary, Object backup) {
- Object ret = amap.get(primary);
- if (ret == null)
- return amap.get(backup);
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java b/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java
deleted file mode 100644
index dc9b8bf..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.io.Serializable;
-
-public class Fields implements Iterable<String>, Serializable {
- private List<String> _fields;
- private Map<String, Integer> _index = new HashMap<String, Integer>();
-
- public Fields(String... fields) {
- this(Arrays.asList(fields));
- }
-
- public Fields(List<String> fields) {
- _fields = new ArrayList<String>(fields.size());
- for (String field : fields) {
- if (_fields.contains(field))
- throw new IllegalArgumentException(String.format(
- "duplicate field '%s'", field));
- _fields.add(field);
- }
- index();
- }
-
- public List<Object> select(Fields selector, List<Object> tuple) {
- List<Object> ret = new ArrayList<Object>(selector.size());
- for (String s : selector) {
- ret.add(tuple.get(_index.get(s)));
- }
- return ret;
- }
-
- public List<String> toList() {
- return new ArrayList<String>(_fields);
- }
-
- public int size() {
- return _fields.size();
- }
-
- public String get(int index) {
- return _fields.get(index);
- }
-
- public Iterator<String> iterator() {
- return _fields.iterator();
- }
-
- /**
- * Returns the position of the specified field.
- */
- public int fieldIndex(String field) {
- Integer ret = _index.get(field);
- if (ret == null) {
- throw new IllegalArgumentException(field + " does not exist");
- }
- return ret;
- }
-
- /**
- * Returns true if this contains the specified name of the field.
- */
- public boolean contains(String field) {
- return _index.containsKey(field);
- }
-
- private void index() {
- for (int i = 0; i < _fields.size(); i++) {
- _index.put(_fields.get(i), i);
- }
- }
-
- @Override
- public String toString() {
- return _fields.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java b/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java
deleted file mode 100644
index b00279d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.List;
-
-public interface ITuple {
-
- /**
- * Returns the number of fields in this tuple.
- */
- public int size();
-
- /**
- * Returns true if this tuple contains the specified name of the field.
- */
- public boolean contains(String field);
-
- /**
- * Gets the names of the fields in this tuple.
- */
- public Fields getFields();
-
- /**
- * Returns the position of the specified field in this tuple.
- */
- public int fieldIndex(String field);
-
- /**
- * Returns a subset of the tuple based on the fields selector.
- */
- public List<Object> select(Fields selector);
-
- /**
- * Gets the field at position i in the tuple. Returns object since tuples are dynamically typed.
- */
- public Object getValue(int i);
-
- /**
- * Returns the String at position i in the tuple. If that field is not a String,
- * you will get a runtime error.
- */
- public String getString(int i);
-
- /**
- * Returns the Integer at position i in the tuple. If that field is not an Integer,
- * you will get a runtime error.
- */
- public Integer getInteger(int i);
-
- /**
- * Returns the Long at position i in the tuple. If that field is not a Long,
- * you will get a runtime error.
- */
- public Long getLong(int i);
-
- /**
- * Returns the Boolean at position i in the tuple. If that field is not a Boolean,
- * you will get a runtime error.
- */
- public Boolean getBoolean(int i);
-
- /**
- * Returns the Short at position i in the tuple. If that field is not a Short,
- * you will get a runtime error.
- */
- public Short getShort(int i);
-
- /**
- * Returns the Byte at position i in the tuple. If that field is not a Byte,
- * you will get a runtime error.
- */
- public Byte getByte(int i);
-
- /**
- * Returns the Double at position i in the tuple. If that field is not a Double,
- * you will get a runtime error.
- */
- public Double getDouble(int i);
-
- /**
- * Returns the Float at position i in the tuple. If that field is not a Float,
- * you will get a runtime error.
- */
- public Float getFloat(int i);
-
- /**
- * Returns the byte array at position i in the tuple. If that field is not a byte array,
- * you will get a runtime error.
- */
- public byte[] getBinary(int i);
-
-
- public Object getValueByField(String field);
-
- public String getStringByField(String field);
-
- public Integer getIntegerByField(String field);
-
- public Long getLongByField(String field);
-
- public Boolean getBooleanByField(String field);
-
- public Short getShortByField(String field);
-
- public Byte getByteByField(String field);
-
- public Double getDoubleByField(String field);
-
- public Float getFloatByField(String field);
-
- public byte[] getBinaryByField(String field);
-
- /**
- * Gets all the values in this tuple.
- */
- public List<Object> getValues();
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java b/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java
deleted file mode 100644
index b1bd68a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package backtype.storm.tuple;
-
-import backtype.storm.utils.Utils;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-public class MessageId {
- private Map<Long, Long> _anchorsToIds;
-
- @Deprecated
- public static long generateId() {
- return Utils.secureRandomLong();
- }
-
- public static long generateId(Random rand) {
- return rand.nextLong();
- }
-
- public static MessageId makeUnanchored() {
- return makeId(new HashMap<Long, Long>());
- }
-
- public static MessageId makeId(Map<Long, Long> anchorsToIds) {
- return new MessageId(anchorsToIds);
- }
-
- public static MessageId makeRootId(long id, long val) {
- Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
- anchorsToIds.put(id, val);
- return new MessageId(anchorsToIds);
- }
-
- protected MessageId(Map<Long, Long> anchorsToIds) {
- _anchorsToIds = anchorsToIds;
- }
-
- public Map<Long, Long> getAnchorsToIds() {
- return _anchorsToIds;
- }
-
- public Set<Long> getAnchors() {
- return _anchorsToIds.keySet();
- }
-
- @Override
- public int hashCode() {
- return _anchorsToIds.hashCode();
- }
-
- @Override
- public boolean equals(Object other) {
- if (other instanceof MessageId) {
- return _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return _anchorsToIds.toString();
- }
-
- public void serialize(Output out) throws IOException {
- out.writeInt(_anchorsToIds.size(), true);
- for (Entry<Long, Long> anchorToId : _anchorsToIds.entrySet()) {
- out.writeLong(anchorToId.getKey());
- out.writeLong(anchorToId.getValue());
- }
- }
-
- public static MessageId deserialize(Input in) throws IOException {
- int numAnchors = in.readInt(true);
- Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
- for (int i = 0; i < numAnchors; i++) {
- anchorsToIds.put(in.readLong(), in.readLong());
- }
- return new MessageId(anchorsToIds);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java b/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java
deleted file mode 100644
index f170cd2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package backtype.storm.tuple;
-
-import backtype.storm.generated.GlobalStreamId;
-import java.util.List;
-
-/**
- * The tuple is the main data structure in Storm. A tuple is a named list of values,
- * where each value can be any type. Tuples are dynamically typed -- the types of the fields
- * do not need to be declared. Tuples have helper methods like getInteger and getString
- * to get field values without having to cast the result.
- *
- * Storm needs to know how to serialize all the values in a tuple. By default, Storm
- * knows how to serialize the primitive types, strings, and byte arrays. If you want to
- * use another type, you'll need to implement and register a serializer for that type.
- * See {@link http://github.com/nathanmarz/storm/wiki/Serialization} for more info.
- */
-public interface Tuple extends ITuple{
-
- /**
- * Returns the global stream id (component + stream) of this tuple.
- */
- public GlobalStreamId getSourceGlobalStreamid();
-
- /**
- * Gets the id of the component that created this tuple.
- */
- public String getSourceComponent();
-
- /**
- * Gets the id of the task that created this tuple.
- */
- public int getSourceTask();
-
- /**
- * Gets the id of the stream that this tuple was emitted to.
- */
- public String getSourceStreamId();
-
- /**
- * Gets the message id that associated with this tuple.
- */
- public MessageId getMessageId();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java b/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java
deleted file mode 100644
index 7307342..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.tuple;
-
-public interface TupleExt extends Tuple {
- /**
- * Get Target TaskId
- *
- * @return
- */
- int getTargetTaskId();
-
- void setTargetTaskId(int targetTaskId);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java b/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java
deleted file mode 100644
index 2f47f6e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java
+++ /dev/null
@@ -1,342 +0,0 @@
-package backtype.storm.tuple;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.utils.IndifferentAccessMap;
-import clojure.lang.ASeq;
-import clojure.lang.Counted;
-import clojure.lang.IMeta;
-import clojure.lang.IPersistentMap;
-import clojure.lang.ISeq;
-import clojure.lang.Indexed;
-import clojure.lang.Keyword;
-import clojure.lang.MapEntry;
-import clojure.lang.Obj;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Seqable;
-import clojure.lang.Symbol;
-import java.util.List;
-
-public class TupleImpl extends IndifferentAccessMap implements Seqable,
- Indexed, IMeta, Tuple {
- private List<Object> values;
- private int taskId;
- private String streamId;
- private GeneralTopologyContext context;
- private MessageId id;
- private IPersistentMap _meta = null;
-
- public TupleImpl(GeneralTopologyContext context, List<Object> values,
- int taskId, String streamId, MessageId id) {
- this.values = values;
- this.taskId = taskId;
- this.streamId = streamId;
- this.id = id;
- this.context = context;
-
- String componentId = context.getComponentId(taskId);
- Fields schema = context.getComponentOutputFields(componentId, streamId);
- if (values.size() != schema.size()) {
- throw new IllegalArgumentException(
- "Tuple created with wrong number of fields. " + "Expected "
- + schema.size() + " fields but got "
- + values.size() + " fields");
- }
- }
-
- public TupleImpl(GeneralTopologyContext context, List<Object> values,
- int taskId, String streamId) {
- this(context, values, taskId, streamId, MessageId.makeUnanchored());
- }
-
- Long _processSampleStartTime = null;
- Long _executeSampleStartTime = null;
-
- public void setProcessSampleStartTime(long ms) {
- _processSampleStartTime = ms;
- }
-
- public Long getProcessSampleStartTime() {
- return _processSampleStartTime;
- }
-
- public void setExecuteSampleStartTime(long ms) {
- _executeSampleStartTime = ms;
- }
-
- public Long getExecuteSampleStartTime() {
- return _executeSampleStartTime;
- }
-
- long _outAckVal = 0;
-
- public void updateAckVal(long val) {
- _outAckVal = _outAckVal ^ val;
- }
-
- public long getAckVal() {
- return _outAckVal;
- }
-
- public int size() {
- return values.size();
- }
-
- public int fieldIndex(String field) {
- return getFields().fieldIndex(field);
- }
-
- public boolean contains(String field) {
- return getFields().contains(field);
- }
-
- public Object getValue(int i) {
- return values.get(i);
- }
-
- public String getString(int i) {
- return (String) values.get(i);
- }
-
- public Integer getInteger(int i) {
- return (Integer) values.get(i);
- }
-
- public Long getLong(int i) {
- return (Long) values.get(i);
- }
-
- public Boolean getBoolean(int i) {
- return (Boolean) values.get(i);
- }
-
- public Short getShort(int i) {
- return (Short) values.get(i);
- }
-
- public Byte getByte(int i) {
- return (Byte) values.get(i);
- }
-
- public Double getDouble(int i) {
- return (Double) values.get(i);
- }
-
- public Float getFloat(int i) {
- return (Float) values.get(i);
- }
-
- public byte[] getBinary(int i) {
- return (byte[]) values.get(i);
- }
-
- public Object getValueByField(String field) {
- return values.get(fieldIndex(field));
- }
-
- public String getStringByField(String field) {
- return (String) values.get(fieldIndex(field));
- }
-
- public Integer getIntegerByField(String field) {
- return (Integer) values.get(fieldIndex(field));
- }
-
- public Long getLongByField(String field) {
- return (Long) values.get(fieldIndex(field));
- }
-
- public Boolean getBooleanByField(String field) {
- return (Boolean) values.get(fieldIndex(field));
- }
-
- public Short getShortByField(String field) {
- return (Short) values.get(fieldIndex(field));
- }
-
- public Byte getByteByField(String field) {
- return (Byte) values.get(fieldIndex(field));
- }
-
- public Double getDoubleByField(String field) {
- return (Double) values.get(fieldIndex(field));
- }
-
- public Float getFloatByField(String field) {
- return (Float) values.get(fieldIndex(field));
- }
-
- public byte[] getBinaryByField(String field) {
- return (byte[]) values.get(fieldIndex(field));
- }
-
- public List<Object> getValues() {
- return values;
- }
-
- public Fields getFields() {
- return context.getComponentOutputFields(getSourceComponent(),
- getSourceStreamId());
- }
-
- public List<Object> select(Fields selector) {
- return getFields().select(selector, values);
- }
-
- public GlobalStreamId getSourceGlobalStreamid() {
- return new GlobalStreamId(getSourceComponent(), streamId);
- }
-
- public String getSourceComponent() {
- return context.getComponentId(taskId);
- }
-
- public int getSourceTask() {
- return taskId;
- }
-
- public String getSourceStreamId() {
- return streamId;
- }
-
- public MessageId getMessageId() {
- return id;
- }
-
- @Override
- public String toString() {
- return "source: " + getSourceComponent() + ":" + taskId + ", stream: "
- + streamId + ", id: " + id.toString() + ", "
- + values.toString();
- }
-
- @Override
- public boolean equals(Object other) {
- return this == other;
- }
-
- @Override
- public int hashCode() {
- return System.identityHashCode(this);
- }
-
- private final Keyword makeKeyword(String name) {
- return Keyword.intern(Symbol.create(name));
- }
-
- /* ILookup */
- @Override
- public Object valAt(Object o) {
- try {
- if (o instanceof Keyword) {
- return getValueByField(((Keyword) o).getName());
- } else if (o instanceof String) {
- return getValueByField((String) o);
- }
- } catch (IllegalArgumentException e) {
- }
- return null;
- }
-
- /* Seqable */
- public ISeq seq() {
- if (values.size() > 0) {
- return new Seq(getFields().toList(), values, 0);
- }
- return null;
- }
-
- static class Seq extends ASeq implements Counted {
- final List<String> fields;
- final List<Object> values;
- final int i;
-
- Seq(List<String> fields, List<Object> values, int i) {
- this.fields = fields;
- this.values = values;
- assert i >= 0;
- this.i = i;
- }
-
- public Seq(IPersistentMap meta, List<String> fields,
- List<Object> values, int i) {
- super(meta);
- this.fields = fields;
- this.values = values;
- assert i >= 0;
- this.i = i;
- }
-
- public Object first() {
- return new MapEntry(fields.get(i), values.get(i));
- }
-
- public ISeq next() {
- if (i + 1 < fields.size()) {
- return new Seq(fields, values, i + 1);
- }
- return null;
- }
-
- public int count() {
- assert fields.size() - i >= 0 : "index out of bounds";
- // i being the position in the fields of this seq, the remainder of
- // the seq is the size
- return fields.size() - i;
- }
-
- public Obj withMeta(IPersistentMap meta) {
- return new Seq(meta, fields, values, i);
- }
- }
-
- /* Indexed */
- public Object nth(int i) {
- if (i < values.size()) {
- return values.get(i);
- } else {
- return null;
- }
- }
-
- public Object nth(int i, Object notfound) {
- Object ret = nth(i);
- if (ret == null)
- ret = notfound;
- return ret;
- }
-
- /* Counted */
- public int count() {
- return values.size();
- }
-
- /* IMeta */
- public IPersistentMap meta() {
- if (_meta == null) {
- _meta = new PersistentArrayMap(new Object[] {
- makeKeyword("stream"), getSourceStreamId(),
- makeKeyword("component"), getSourceComponent(),
- makeKeyword("task"), getSourceTask() });
- }
- return _meta;
- }
-
- private PersistentArrayMap toMap() {
- Object array[] = new Object[values.size() * 2];
- List<String> fields = getFields().toList();
- for (int i = 0; i < values.size(); i++) {
- array[i * 2] = fields.get(i);
- array[(i * 2) + 1] = values.get(i);
- }
- return new PersistentArrayMap(array);
- }
-
- public IPersistentMap getMap() {
- if (_map == null) {
- setMap(toMap());
- }
- return _map;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java b/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java
deleted file mode 100644
index 5d4b487..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.List;
-
-import backtype.storm.task.GeneralTopologyContext;
-
-public class TupleImplExt extends TupleImpl implements TupleExt {
-
- protected int targetTaskId;
-
- public TupleImplExt(GeneralTopologyContext context, List<Object> values,
- int taskId, String streamId) {
- super(context, values, taskId, streamId);
- }
-
- public TupleImplExt(GeneralTopologyContext context, List<Object> values,
- int taskId, String streamId, MessageId id) {
- super(context, values, taskId, streamId, id);
- }
-
- @Override
- public int getTargetTaskId() {
- return targetTaskId;
- }
-
- @Override
- public void setTargetTaskId(int targetTaskId) {
- this.targetTaskId = targetTaskId;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/Values.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/Values.java b/jstorm-client/src/main/java/backtype/storm/tuple/Values.java
deleted file mode 100644
index d374f67..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/Values.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.ArrayList;
-
-/**
- * A convenience class for making tuple values using new Values("field1", 2, 3)
- * syntax.
- */
-public class Values extends ArrayList<Object> {
- public Values() {
-
- }
-
- public Values(Object... vals) {
- super(vals.length);
- for (Object o : vals) {
- add(o);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java b/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java
deleted file mode 100644
index c3e1a20..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package backtype.storm.utils;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-
-public class BufferFileInputStream {
- byte[] buffer;
- FileInputStream stream;
-
- public BufferFileInputStream(String file, int bufferSize)
- throws FileNotFoundException {
- stream = new FileInputStream(file);
- buffer = new byte[bufferSize];
- }
-
- public BufferFileInputStream(String file) throws FileNotFoundException {
- this(file, 15 * 1024);
- }
-
- public byte[] read() throws IOException {
- int length = stream.read(buffer);
- if (length == -1) {
- close();
- return new byte[0];
- } else if (length == buffer.length) {
- return buffer;
- } else {
- return Arrays.copyOf(buffer, length);
- }
- }
-
- public void close() throws IOException {
- stream.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java b/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java
deleted file mode 100644
index 46265b0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package backtype.storm.utils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.zip.CRC32;
-
-public class CRC32OutputStream extends OutputStream {
- private CRC32 hasher;
-
- public CRC32OutputStream() {
- hasher = new CRC32();
- }
-
- public long getValue() {
- return hasher.getValue();
- }
-
- @Override
- public void write(int i) throws IOException {
- hasher.update(i);
- }
-
- @Override
- public void write(byte[] bytes, int start, int end) throws IOException {
- hasher.update(bytes, start, end);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java b/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java
deleted file mode 100644
index b9094e2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.utils;
-
-import clojure.lang.IFn;
-import java.util.TimerTask;
-
-public class ClojureTimerTask extends TimerTask {
- IFn _afn;
-
- public ClojureTimerTask(IFn afn) {
- super();
- _afn = afn;
- }
-
- @Override
- public void run() {
- _afn.run();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/Container.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/Container.java b/jstorm-client/src/main/java/backtype/storm/utils/Container.java
deleted file mode 100644
index b8a6f12..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/Container.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.utils;
-
-import java.io.Serializable;
-
-public class Container implements Serializable {
- public Object object;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java b/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java
deleted file mode 100644
index 975d7d8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package backtype.storm.utils;
-
-import org.apache.thrift7.TException;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.transport.TFramedTransport;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
-
-import backtype.storm.generated.DRPCExecutionException;
-import backtype.storm.generated.DistributedRPC;
-
-public class DRPCClient implements DistributedRPC.Iface {
- private TTransport conn;
- private DistributedRPC.Client client;
- private String host;
- private int port;
- private Integer timeout;
-
- public DRPCClient(String host, int port, Integer timeout) {
- try {
- this.host = host;
- this.port = port;
- this.timeout = timeout;
- connect();
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- }
-
- public DRPCClient(String host, int port) {
- this(host, port, null);
- }
-
- private void connect() throws TException {
- TSocket socket = new TSocket(host, port);
- if (timeout != null) {
- socket.setTimeout(timeout);
- }
- conn = new TFramedTransport(socket);
- client = new DistributedRPC.Client(new TBinaryProtocol(conn));
- conn.open();
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public String execute(String func, String args) throws TException,
- DRPCExecutionException {
- try {
- if (client == null)
- connect();
- return client.execute(func, args);
- } catch (TException e) {
- client = null;
- throw e;
- } catch (DRPCExecutionException e) {
- client = null;
- throw e;
- }
- }
-
- public void close() {
- conn.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java b/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java
deleted file mode 100644
index 8d9e861..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package backtype.storm.utils;
-
-import backtype.storm.metric.api.IStatefulObject;
-
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-/**
- *
- * A single consumer queue that uses the LMAX Disruptor. They key to the
- * performance is the ability to catch up to the producer by processing tuples
- * in batches.
- */
-public abstract class DisruptorQueue implements IStatefulObject {
- public static void setUseSleep(boolean useSleep) {
- DisruptorQueueImpl.setUseSleep(useSleep);
- }
-
- private static boolean CAPACITY_LIMITED = false;
-
- public static void setLimited(boolean limited) {
- CAPACITY_LIMITED = limited;
- }
-
- public static DisruptorQueue mkInstance(String queueName,
- ProducerType producerType, int bufferSize, WaitStrategy wait) {
- if (CAPACITY_LIMITED == true) {
- return new DisruptorQueueImpl(queueName, producerType, bufferSize,
- wait);
- } else {
- return new DisruptorWrapBlockingQueue(queueName, producerType,
- bufferSize, wait);
- }
- }
-
- public abstract String getName();
-
-
-
- public abstract void haltWithInterrupt();
-
- public abstract Object poll();
-
- public abstract Object take();
-
- public abstract void consumeBatch(EventHandler<Object> handler);
-
- public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler);
-
- public abstract void publish(Object obj);
-
- public abstract void publish(Object obj, boolean block)
- throws InsufficientCapacityException;
-
- public abstract void consumerStarted();
-
- public abstract void clear();
-
- public abstract long population();
-
- public abstract long capacity();
-
- public abstract long writePos();
-
- public abstract long readPos();
-
- public abstract float pctFull();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java b/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
deleted file mode 100644
index 0c334b5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
+++ /dev/null
@@ -1,298 +0,0 @@
-package backtype.storm.utils;
-
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.utils.disruptor.AbstractSequencerExt;
-import backtype.storm.utils.disruptor.RingBuffer;
-
-import com.lmax.disruptor.AlertException;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-/**
- *
- * A single consumer queue that uses the LMAX Disruptor. They key to the
- * performance is the ability to catch up to the producer by processing tuples
- * in batches.
- */
-public class DisruptorQueueImpl extends DisruptorQueue {
- private static final Logger LOG = Logger.getLogger(DisruptorQueueImpl.class);
- static boolean useSleep = true;
- public static void setUseSleep(boolean useSleep) {
- AbstractSequencerExt.setWaitSleep(useSleep);
- }
-
- private static final Object FLUSH_CACHE = new Object();
- private static final Object INTERRUPT = new Object();
- private static final String PREFIX = "disruptor-";
-
- private final String _queueName;
- private final RingBuffer<MutableObject> _buffer;
- private final Sequence _consumer;
- private final SequenceBarrier _barrier;
-
- // TODO: consider having a threadlocal cache of this variable to speed up
- // reads?
- volatile boolean consumerStartedFlag = false;
-
- private final HashMap<String, Object> state = new HashMap<String, Object>(4);
- private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
- private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
- private final Lock readLock = cacheLock.readLock();
- private final Lock writeLock = cacheLock.writeLock();
-
- public DisruptorQueueImpl(String queueName, ProducerType producerType,
- int bufferSize, WaitStrategy wait) {
- this._queueName = PREFIX + queueName;
- _buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
- bufferSize, wait);
- _consumer = new Sequence();
- _barrier = _buffer.newBarrier();
- _buffer.addGatingSequences(_consumer);
- if (producerType == ProducerType.SINGLE) {
- consumerStartedFlag = true;
- } else {
- // make sure we flush the pending messages in cache first
- if (bufferSize < 2) {
- throw new RuntimeException("QueueSize must >= 2");
- }
- try {
- publishDirect(FLUSH_CACHE, true);
- } catch (InsufficientCapacityException e) {
- throw new RuntimeException("This code should be unreachable!",
- e);
- }
- }
- }
-
- public String getName() {
- return _queueName;
- }
-
- public void consumeBatch(EventHandler<Object> handler) {
- consumeBatchToCursor(_barrier.getCursor(), handler);
- }
-
- public void haltWithInterrupt() {
- publish(INTERRUPT);
- }
-
- public Object poll() {
- // @@@
- // should use _cache.isEmpty, but it is slow
- // I will change the logic later
- if (consumerStartedFlag == false) {
- return _cache.poll();
- }
-
- final long nextSequence = _consumer.get() + 1;
- if (nextSequence <= _barrier.getCursor()) {
- MutableObject mo = _buffer.get(nextSequence);
- _consumer.set(nextSequence);
- Object ret = mo.o;
- mo.setObject(null);
- return ret;
- }
- return null;
- }
-
- public Object take() {
- // @@@
- // should use _cache.isEmpty, but it is slow
- // I will change the logic later
- if (consumerStartedFlag == false) {
- return _cache.poll();
- }
-
- final long nextSequence = _consumer.get() + 1;
- // final long availableSequence;
- try {
- _barrier.waitFor(nextSequence);
- } catch (AlertException e) {
- LOG.error(e.getCause(), e);
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- LOG.error("InterruptedException " + e.getCause());
- // throw new RuntimeException(e);
- return null;
- } catch (TimeoutException e) {
- LOG.error(e.getCause(), e);
- return null;
- }
- MutableObject mo = _buffer.get(nextSequence);
- _consumer.set(nextSequence);
- Object ret = mo.o;
- mo.setObject(null);
- return ret;
- }
-
- public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
- try {
- final long nextSequence = _consumer.get() + 1;
- final long availableSequence = _barrier.waitFor(nextSequence);
- if (availableSequence >= nextSequence) {
- consumeBatchToCursor(availableSequence, handler);
- }
- } catch (AlertException e) {
- LOG.error(e.getCause(), e);
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- LOG.error("InterruptedException " + e.getCause());
- return;
- }catch (TimeoutException e) {
- LOG.error(e.getCause(), e);
- return ;
- }
- }
-
- public void consumeBatchToCursor(long cursor, EventHandler<Object> handler){
- for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
- try {
- MutableObject mo = _buffer.get(curr);
- Object o = mo.o;
- mo.setObject(null);
- if (o == FLUSH_CACHE) {
- Object c = null;
- while (true) {
- c = _cache.poll();
- if (c == null)
- break;
- else
- handler.onEvent(c, curr, true);
- }
- } else if (o == INTERRUPT) {
- throw new InterruptedException(
- "Disruptor processing interrupted");
- } else {
- handler.onEvent(o, curr, curr == cursor);
- }
- } catch (InterruptedException e) {
- // throw new RuntimeException(e);
- LOG.error(e.getCause());
- return;
- } catch (Exception e) {
- LOG.error(e.getCause(), e);
- throw new RuntimeException(e);
- }
- }
- // TODO: only set this if the consumer cursor has changed?
- _consumer.set(cursor);
- }
-
- /*
- * Caches until consumerStarted is called, upon which the cache is flushed
- * to the consumer
- */
- public void publish(Object obj) {
- try {
- publish(obj, true);
- } catch (InsufficientCapacityException ex) {
- throw new RuntimeException("This code should be unreachable!");
- }
- }
-
- public void tryPublish(Object obj) throws InsufficientCapacityException {
- publish(obj, false);
- }
-
- public void publish(Object obj, boolean block)
- throws InsufficientCapacityException {
-
- boolean publishNow = consumerStartedFlag;
-
- if (!publishNow) {
- readLock.lock();
- try {
- publishNow = consumerStartedFlag;
- if (!publishNow) {
- _cache.add(obj);
- }
- } finally {
- readLock.unlock();
- }
- }
-
- if (publishNow) {
- publishDirect(obj, block);
- }
- }
-
- protected void publishDirect(Object obj, boolean block)
- throws InsufficientCapacityException {
- final long id;
- if (block) {
- id = _buffer.next();
- } else {
- id = _buffer.tryNext(1);
- }
- final MutableObject m = _buffer.get(id);
- m.setObject(obj);
- _buffer.publish(id);
- }
-
- public void consumerStarted() {
-
- writeLock.lock();
- consumerStartedFlag = true;
-
- writeLock.unlock();
- }
-
- public void clear() {
- while (population() != 0L) {
- poll();
- }
- }
-
- public long population() {
- return (writePos() - readPos());
- }
-
- public long capacity() {
- return _buffer.getBufferSize();
- }
-
- public long writePos() {
- return _buffer.getCursor();
- }
-
- public long readPos() {
- return _consumer.get();
- }
-
- public float pctFull() {
- return (1.0F * population() / capacity());
- }
-
- @Override
- public Object getState() {
- // get readPos then writePos so it's never an under-estimate
- long rp = readPos();
- long wp = writePos();
- state.put("capacity", capacity());
- state.put("population", wp - rp);
- state.put("write_pos", wp);
- state.put("read_pos", rp);
- return state;
- }
-
- public static class ObjectEventFactory implements
- EventFactory<MutableObject> {
- @Override
- public MutableObject newInstance() {
- return new MutableObject();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java b/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
deleted file mode 100644
index a701f39..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
+++ /dev/null
@@ -1,192 +0,0 @@
-package backtype.storm.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.metric.api.IStatefulObject;
-
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-/**
- *
- * A single consumer queue that uses the LMAX Disruptor. They key to the
- * performance is the ability to catch up to the producer by processing tuples
- * in batches.
- */
-public class DisruptorWrapBlockingQueue extends DisruptorQueue {
- private static final Logger LOG = Logger
- .getLogger(DisruptorWrapBlockingQueue.class);
-
- private static final long QUEUE_CAPACITY = 512;
- private LinkedBlockingDeque<Object> queue;
-
- private String queueName;
-
- public DisruptorWrapBlockingQueue(String queueName,
- ProducerType producerType, int bufferSize, WaitStrategy wait) {
- this.queueName = queueName;
- queue = new LinkedBlockingDeque<Object>();
- }
-
- public String getName() {
- return queueName;
- }
-
- // poll method
- public void consumeBatch(EventHandler<Object> handler) {
- consumeBatchToCursor(0, handler);
- }
-
- public void haltWithInterrupt() {
- }
-
- public Object poll() {
- return queue.poll();
- }
-
- public Object take() {
- try {
- return queue.take();
- } catch (InterruptedException e) {
- return null;
- }
- }
-
- public void drainQueue(Object object, EventHandler<Object> handler) {
- while (object != null) {
- try {
- handler.onEvent(object, 0, false);
- object = queue.poll();
- } catch (InterruptedException e) {
- LOG.warn("Occur interrupt error, " + object);
- break;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
- Object object = queue.poll();
- if (object == null) {
- try {
- object = queue.take();
- } catch (InterruptedException e) {
- LOG.warn("Occur interrupt error, " + object);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- drainQueue(object, handler);
-
- }
-
- public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
- Object object = queue.poll();
- drainQueue(object, handler);
- }
-
- /*
- * Caches until consumerStarted is called, upon which the cache is flushed
- * to the consumer
- */
- public void publish(Object obj) {
- boolean isSuccess = queue.offer(obj);
- while (isSuccess == false) {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- }
- isSuccess = queue.offer(obj);
- }
-
- }
-
- public void tryPublish(Object obj) throws InsufficientCapacityException {
- boolean isSuccess = queue.offer(obj);
- if (isSuccess == false) {
- throw InsufficientCapacityException.INSTANCE;
- }
-
- }
-
- public void publish(Object obj, boolean block)
- throws InsufficientCapacityException {
- if (block == true) {
- publish(obj);
- } else {
- tryPublish(obj);
- }
- }
-
- public void consumerStarted() {
- }
-
- private void flushCache() {
- }
-
- public void clear() {
- queue.clear();
- }
-
- public long population() {
- return queue.size();
- }
-
- public long capacity() {
- long used = queue.size();
- if (used < QUEUE_CAPACITY) {
- return QUEUE_CAPACITY;
- } else {
- return used;
- }
- }
-
- public long writePos() {
- return 0;
- }
-
- public long readPos() {
- return queue.size();
- }
-
- public float pctFull() {
- long used = queue.size();
- if (used < QUEUE_CAPACITY) {
- return (1.0F * used / QUEUE_CAPACITY);
- } else {
- return 1.0f;
- }
- }
-
- @Override
- public Object getState() {
- Map state = new HashMap<String, Object>();
- // get readPos then writePos so it's never an under-estimate
- long rp = readPos();
- long wp = writePos();
- state.put("capacity", capacity());
- state.put("population", wp - rp);
- state.put("write_pos", wp);
- state.put("read_pos", rp);
- return state;
- }
-
- public static class ObjectEventFactory implements
- EventFactory<MutableObject> {
- @Override
- public MutableObject newInstance() {
- return new MutableObject();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java b/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
deleted file mode 100644
index 74c4a63..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package backtype.storm.utils;
-
-import clojure.lang.ILookup;
-import clojure.lang.ISeq;
-import clojure.lang.AFn;
-import clojure.lang.IPersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.IMapEntry;
-import clojure.lang.IPersistentCollection;
-import clojure.lang.Keyword;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Collection;
-import java.util.Set;
-
-public class IndifferentAccessMap extends AFn implements ILookup,
- IPersistentMap, Map {
-
- protected IPersistentMap _map;
-
- protected IndifferentAccessMap() {
- }
-
- public IndifferentAccessMap(IPersistentMap map) {
- setMap(map);
- }
-
- public IPersistentMap getMap() {
- return _map;
- }
-
- public IPersistentMap setMap(IPersistentMap map) {
- _map = map;
- return _map;
- }
-
- public int size() {
- return ((Map) getMap()).size();
- }
-
- public int count() {
- return size();
- }
-
- public ISeq seq() {
- return getMap().seq();
- }
-
- @Override
- public Object valAt(Object o) {
- if (o instanceof Keyword) {
- return valAt(((Keyword) o).getName());
- }
- return getMap().valAt(o);
- }
-
- @Override
- public Object valAt(Object o, Object def) {
- Object ret = valAt(o);
- if (ret == null)
- ret = def;
- return ret;
- }
-
- /* IFn */
- @Override
- public Object invoke(Object o) {
- return valAt(o);
- }
-
- @Override
- public Object invoke(Object o, Object notfound) {
- return valAt(o, notfound);
- }
-
- /* IPersistentMap */
- /* Naive implementation, but it might be good enough */
- public IPersistentMap assoc(Object k, Object v) {
- if (k instanceof Keyword)
- return assoc(((Keyword) k).getName(), v);
-
- return new IndifferentAccessMap(getMap().assoc(k, v));
- }
-
- public IPersistentMap assocEx(Object k, Object v) {
- if (k instanceof Keyword)
- return assocEx(((Keyword) k).getName(), v);
-
- return new IndifferentAccessMap(getMap().assocEx(k, v));
- }
-
- public IPersistentMap without(Object k) {
- if (k instanceof Keyword)
- return without(((Keyword) k).getName());
-
- return new IndifferentAccessMap(getMap().without(k));
- }
-
- public boolean containsKey(Object k) {
- if (k instanceof Keyword)
- return containsKey(((Keyword) k).getName());
- return getMap().containsKey(k);
- }
-
- public IMapEntry entryAt(Object k) {
- if (k instanceof Keyword)
- return entryAt(((Keyword) k).getName());
-
- return getMap().entryAt(k);
- }
-
- public IPersistentCollection cons(Object o) {
- return getMap().cons(o);
- }
-
- public IPersistentCollection empty() {
- return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
- }
-
- public boolean equiv(Object o) {
- return getMap().equiv(o);
- }
-
- public Iterator iterator() {
- return getMap().iterator();
- }
-
- /* Map */
- public boolean containsValue(Object v) {
- return ((Map) getMap()).containsValue(v);
- }
-
- public Set entrySet() {
- return ((Map) getMap()).entrySet();
- }
-
- public Object get(Object k) {
- return valAt(k);
- }
-
- public boolean isEmpty() {
- return ((Map) getMap()).isEmpty();
- }
-
- public Set keySet() {
- return ((Map) getMap()).keySet();
- }
-
- public Collection values() {
- return ((Map) getMap()).values();
- }
-
- /* Not implemented */
- public void clear() {
- throw new UnsupportedOperationException();
- }
-
- public Object put(Object k, Object v) {
- throw new UnsupportedOperationException();
- }
-
- public void putAll(Map m) {
- throw new UnsupportedOperationException();
- }
-
- public Object remove(Object k) {
- throw new UnsupportedOperationException();
- }
-}