You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:56 UTC
[17/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java b/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java
deleted file mode 100644
index 1e75e00..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/TridentTopologyBuilder.java
+++ /dev/null
@@ -1,751 +0,0 @@
-package storm.trident.topology;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang.builder.ToStringBuilder;
-import org.apache.commons.lang.builder.ToStringStyle;
-import storm.trident.spout.BatchSpoutExecutor;
-import storm.trident.spout.IBatchSpout;
-import storm.trident.spout.ICommitterTridentSpout;
-import storm.trident.spout.ITridentSpout;
-import storm.trident.spout.RichSpoutBatchTriggerer;
-import storm.trident.spout.TridentSpoutCoordinator;
-import storm.trident.spout.TridentSpoutExecutor;
-import storm.trident.topology.TridentBoltExecutor.CoordSpec;
-import storm.trident.topology.TridentBoltExecutor.CoordType;
-
-// based on transactional topologies
-public class TridentTopologyBuilder {
- Map<GlobalStreamId, String> _batchIds = new HashMap();
- Map<String, TransactionalSpoutComponent> _spouts = new HashMap();
- Map<String, SpoutComponent> _batchPerTupleSpouts = new HashMap();
- Map<String, Component> _bolts = new HashMap();
-
-
- public SpoutDeclarer setBatchPerTupleSpout(String id, String streamName, IRichSpout spout, Integer parallelism, String batchGroup) {
- Map<String, String> batchGroups = new HashMap();
- batchGroups.put(streamName, batchGroup);
- markBatchGroups(id, batchGroups);
- SpoutComponent c = new SpoutComponent(spout, streamName, parallelism, batchGroup);
- _batchPerTupleSpouts.put(id, c);
- return new SpoutDeclarerImpl(c);
- }
-
- public SpoutDeclarer setSpout(String id, String streamName, String txStateId, IBatchSpout spout, Integer parallelism, String batchGroup) {
- return setSpout(id, streamName, txStateId, new BatchSpoutExecutor(spout), parallelism, batchGroup);
- }
-
- public SpoutDeclarer setSpout(String id, String streamName, String txStateId, ITridentSpout spout, Integer parallelism, String batchGroup) {
- Map<String, String> batchGroups = new HashMap();
- batchGroups.put(streamName, batchGroup);
- markBatchGroups(id, batchGroups);
-
- TransactionalSpoutComponent c = new TransactionalSpoutComponent(spout, streamName, parallelism, txStateId, batchGroup);
- _spouts.put(id, c);
- return new SpoutDeclarerImpl(c);
- }
-
- // map from stream name to batch id
- public BoltDeclarer setBolt(String id, ITridentBatchBolt bolt, Integer parallelism, Set<String> committerBatches, Map<String, String> batchGroups) {
- markBatchGroups(id, batchGroups);
- Component c = new Component(bolt, parallelism, committerBatches);
- _bolts.put(id, c);
- return new BoltDeclarerImpl(c);
-
- }
-
- String masterCoordinator(String batchGroup) {
- return "$mastercoord-" + batchGroup;
- }
-
- static final String SPOUT_COORD_PREFIX = "$spoutcoord-";
-
- public static String spoutCoordinator(String spoutId) {
- return SPOUT_COORD_PREFIX + spoutId;
- }
-
- public static String spoutIdFromCoordinatorId(String coordId) {
- return coordId.substring(SPOUT_COORD_PREFIX.length());
- }
-
- Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean includeCommitStream) {
- Map<GlobalStreamId, String> ret = new HashMap<GlobalStreamId, String>(_batchIds);
- Set<String> allBatches = new HashSet(_batchIds.values());
- for(String b: allBatches) {
- ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.BATCH_STREAM_ID), b);
- if(includeCommitStream) {
- ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
- }
- // DO NOT include the success stream as part of the batch. it should not trigger coordination tuples,
- // and is just a metadata tuple to assist in cleanup, should not trigger batch tracking
- }
-
- for(String id: _spouts.keySet()) {
- TransactionalSpoutComponent c = _spouts.get(id);
- if(c.batchGroupId!=null) {
- ret.put(new GlobalStreamId(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID), c.batchGroupId);
- }
- }
-
- //this takes care of setting up coord streams for spouts and bolts
- for(GlobalStreamId s: _batchIds.keySet()) {
- String b = _batchIds.get(s);
- ret.put(new GlobalStreamId(s.get_componentId(), TridentBoltExecutor.COORD_STREAM(b)), b);
- }
-
- return ret;
- }
-
- public StormTopology buildTopology() {
- TopologyBuilder builder = new TopologyBuilder();
- Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
- Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
-
- Map<String, List<String>> batchesToCommitIds = new HashMap<String, List<String>>();
- Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<String, List<ITridentSpout>>();
-
- for(String id: _spouts.keySet()) {
- TransactionalSpoutComponent c = _spouts.get(id);
- if(c.spout instanceof IRichSpout) {
-
- //TODO: wrap this to set the stream name
- builder.setSpout(id, (IRichSpout) c.spout, c.parallelism);
- } else {
- String batchGroup = c.batchGroupId;
- if(!batchesToCommitIds.containsKey(batchGroup)) {
- batchesToCommitIds.put(batchGroup, new ArrayList<String>());
- }
- batchesToCommitIds.get(batchGroup).add(c.commitStateId);
-
- if(!batchesToSpouts.containsKey(batchGroup)) {
- batchesToSpouts.put(batchGroup, new ArrayList<ITridentSpout>());
- }
- batchesToSpouts.get(batchGroup).add((ITridentSpout) c.spout);
-
-
- BoltDeclarer scd =
- builder.setBolt(spoutCoordinator(id), new TridentSpoutCoordinator(c.commitStateId, (ITridentSpout) c.spout))
- .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.BATCH_STREAM_ID)
- .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.SUCCESS_STREAM_ID)
- .globalGrouping(masterCoordinator(c.batchGroupId), MasterBatchCoordinator.COMMIT_STREAM_ID);
-
- for(Map m: c.componentConfs) {
- scd.addConfigurations(m);
- }
-
- Map<String, TridentBoltExecutor.CoordSpec> specs = new HashMap();
- specs.put(c.batchGroupId, new CoordSpec());
- BoltDeclarer bd = builder.setBolt(id,
- new TridentBoltExecutor(
- new TridentSpoutExecutor(
- c.commitStateId,
- c.streamName,
- ((ITridentSpout) c.spout)),
- batchIdsForSpouts,
- specs),
- c.parallelism);
- bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
- bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.SUCCESS_STREAM_ID);
- if(c.spout instanceof ICommitterTridentSpout) {
- bd.allGrouping(masterCoordinator(batchGroup), MasterBatchCoordinator.COMMIT_STREAM_ID);
- }
- for(Map m: c.componentConfs) {
- bd.addConfigurations(m);
- }
- }
- }
-
- for(String id: _batchPerTupleSpouts.keySet()) {
- SpoutComponent c = _batchPerTupleSpouts.get(id);
- SpoutDeclarer d = builder.setSpout(id, new RichSpoutBatchTriggerer((IRichSpout) c.spout, c.streamName, c.batchGroupId), c.parallelism);
-
- for(Map conf: c.componentConfs) {
- d.addConfigurations(conf);
- }
- }
-
- for(String id: _bolts.keySet()) {
- Component c = _bolts.get(id);
-
- Map<String, CoordSpec> specs = new HashMap();
-
- for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
- String batch = batchIdsForBolts.get(s);
- if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
- CoordSpec spec = specs.get(batch);
- CoordType ct;
- if(_batchPerTupleSpouts.containsKey(s.get_componentId())) {
- ct = CoordType.single();
- } else {
- ct = CoordType.all();
- }
- spec.coords.put(s.get_componentId(), ct);
- }
-
- for(String b: c.committerBatches) {
- specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
- }
-
- BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
- for(Map conf: c.componentConfs) {
- d.addConfigurations(conf);
- }
-
- for(InputDeclaration inputDecl: c.declarations) {
- inputDecl.declare(d);
- }
-
- Map<String, Set<String>> batchToComponents = getBoltBatchToComponentSubscriptions(id);
- for(String b: batchToComponents.keySet()) {
- for(String comp: batchToComponents.get(b)) {
- d.directGrouping(comp, TridentBoltExecutor.COORD_STREAM(b));
- }
- }
-
- for(String b: c.committerBatches) {
- d.allGrouping(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
- }
- }
-
- for(String batch: batchesToCommitIds.keySet()) {
- List<String> commitIds = batchesToCommitIds.get(batch);
- boolean batchCommit = false;
- builder.setSpout(masterCoordinator(batch), new MasterBatchCoordinator(commitIds, batchesToSpouts.get(batch)));
- }
-
- return builder.createTopology();
- }
-
- private void markBatchGroups(String component, Map<String, String> batchGroups) {
- for(String stream: batchGroups.keySet()) {
- _batchIds.put(new GlobalStreamId(component, stream), batchGroups.get(stream));
- }
- }
-
-
- private static class SpoutComponent {
- public Object spout;
- public Integer parallelism;
- public List<Map> componentConfs = new ArrayList<Map>();
- String batchGroupId;
- String streamName;
-
- public SpoutComponent(Object spout, String streamName, Integer parallelism, String batchGroupId) {
- this.spout = spout;
- this.streamName = streamName;
- this.parallelism = parallelism;
- this.batchGroupId = batchGroupId;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this);
- }
- }
-
- private static class TransactionalSpoutComponent extends SpoutComponent {
- public String commitStateId;
-
- public TransactionalSpoutComponent(Object spout, String streamName, Integer parallelism, String commitStateId, String batchGroupId) {
- super(spout, streamName, parallelism, batchGroupId);
- this.commitStateId = commitStateId;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
- }
- }
-
- private static class Component {
- public ITridentBatchBolt bolt;
- public Integer parallelism;
- public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
- public List<Map> componentConfs = new ArrayList<Map>();
- public Set<String> committerBatches;
-
- public Component(ITridentBatchBolt bolt, Integer parallelism,Set<String> committerBatches) {
- this.bolt = bolt;
- this.parallelism = parallelism;
- this.committerBatches = committerBatches;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.MULTI_LINE_STYLE);
- }
- }
-
- Map<String, Set<String>> getBoltBatchToComponentSubscriptions(String id) {
- Map<String, Set<String>> ret = new HashMap();
- for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
- String b = _batchIds.get(s);
- if(!ret.containsKey(b)) ret.put(b, new HashSet());
- ret.get(b).add(s.get_componentId());
- }
- return ret;
- }
-
- List<GlobalStreamId> getBoltSubscriptionStreams(String id) {
- List<GlobalStreamId> ret = new ArrayList();
- Component c = _bolts.get(id);
- for(InputDeclaration d: c.declarations) {
- ret.add(new GlobalStreamId(d.getComponent(), d.getStream()));
- }
- return ret;
- }
-
- private static interface InputDeclaration {
- void declare(InputDeclarer declarer);
- String getComponent();
- String getStream();
- }
-
- private class SpoutDeclarerImpl extends BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
- SpoutComponent _component;
-
- public SpoutDeclarerImpl(SpoutComponent component) {
- _component = component;
- }
-
- @Override
- public SpoutDeclarer addConfigurations(Map conf) {
- _component.componentConfs.add(conf);
- return this;
- }
- }
-
- private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
- Component _component;
-
- public BoltDeclarerImpl(Component component) {
- _component = component;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, streamId, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localFirstGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localFirstGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component, final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return null;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, streamId, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
-
- @Override
- public String getStream() {
- return streamId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.grouping(stream, grouping);
- }
-
- @Override
- public String getComponent() {
- return stream.get_componentId();
- }
-
- @Override
- public String getStream() {
- return stream.get_streamId();
- }
- });
- return this;
- }
-
- private void addDeclaration(InputDeclaration declaration) {
- _component.declarations.add(declaration);
- }
-
- @Override
- public BoltDeclarer addConfigurations(Map conf) {
- _component.componentConfs.add(conf);
- return this;
- }
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java b/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java
deleted file mode 100644
index 9f22cc7..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/state/RotatingTransactionalState.java
+++ /dev/null
@@ -1,130 +0,0 @@
-package storm.trident.topology.state;
-
-import backtype.storm.utils.Utils;
-import org.apache.zookeeper.KeeperException;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-public class RotatingTransactionalState {
- public static interface StateInitializer {
- Object init(long txid, Object lastState);
- }
-
- private TransactionalState _state;
- private String _subdir;
-
- private TreeMap<Long, Object> _curr = new TreeMap<Long, Object>();
-
- public RotatingTransactionalState(TransactionalState state, String subdir) {
- _state = state;
- _subdir = subdir;
- state.mkdir(subdir);
- sync();
- }
-
-
- public Object getLastState() {
- if(_curr.isEmpty()) return null;
- else return _curr.lastEntry().getValue();
- }
-
- public void overrideState(long txid, Object state) {
- _state.setData(txPath(txid), state);
- _curr.put(txid, state);
- }
-
- public void removeState(long txid) {
- if(_curr.containsKey(txid)) {
- _curr.remove(txid);
- _state.delete(txPath(txid));
- }
- }
-
- public Object getState(long txid) {
- return _curr.get(txid);
- }
-
- public Object getState(long txid, StateInitializer init) {
- if(!_curr.containsKey(txid)) {
- SortedMap<Long, Object> prevMap = _curr.headMap(txid);
- SortedMap<Long, Object> afterMap = _curr.tailMap(txid);
-
- Long prev = null;
- if(!prevMap.isEmpty()) prev = prevMap.lastKey();
-
- Object data;
- if(afterMap.isEmpty()) {
- Object prevData;
- if(prev!=null) {
- prevData = _curr.get(prev);
- } else {
- prevData = null;
- }
- data = init.init(txid, prevData);
- } else {
- data = null;
- }
- _curr.put(txid, data);
- _state.setData(txPath(txid), data);
- }
- return _curr.get(txid);
- }
-
- public Object getPreviousState(long txid) {
- SortedMap<Long, Object> prevMap = _curr.headMap(txid);
- if(prevMap.isEmpty()) return null;
- else return prevMap.get(prevMap.lastKey());
- }
-
- public boolean hasCache(long txid) {
- return _curr.containsKey(txid);
- }
-
- /**
- * Returns null if it was created, the value otherwise.
- */
- public Object getStateOrCreate(long txid, StateInitializer init) {
- if(_curr.containsKey(txid)) {
- return _curr.get(txid);
- } else {
- getState(txid, init);
- return null;
- }
- }
-
- public void cleanupBefore(long txid) {
- SortedMap<Long, Object> toDelete = _curr.headMap(txid);
- for(long tx: new HashSet<Long>(toDelete.keySet())) {
- _curr.remove(tx);
- try {
- _state.delete(txPath(tx));
- } catch(RuntimeException e) {
- // Ignore NoNodeExists exceptions because when sync() it may populate _curr with stale data since
- // zookeeper reads are eventually consistent.
- if(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
- throw e;
- }
- }
- }
- }
-
- private void sync() {
- List<String> txids = _state.list(_subdir);
- for(String txid_s: txids) {
- Object data = _state.getData(txPath(txid_s));
- _curr.put(Long.parseLong(txid_s), data);
- }
- }
-
- private String txPath(long tx) {
- return txPath("" + tx);
- }
-
- private String txPath(String tx) {
- return _subdir + "/" + tx;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java b/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java
deleted file mode 100644
index 44d4282..0000000
--- a/jstorm-client/src/main/java/storm/trident/topology/state/TransactionalState.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package storm.trident.topology.state;
-
-
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-
-public class TransactionalState {
- CuratorFramework _curator;
-
- public static TransactionalState newUserState(Map conf, String id) {
- return new TransactionalState(conf, id, "user");
- }
-
- public static TransactionalState newCoordinatorState(Map conf, String id) {
- return new TransactionalState(conf, id, "coordinator");
- }
-
- protected TransactionalState(Map conf, String id, String subroot) {
- try {
- conf = new HashMap(conf);
- String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT) + "/" + id + "/" + subroot;
- List<String> servers = (List<String>) getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Config.STORM_ZOOKEEPER_SERVERS);
- Object port = getWithBackup(conf, Config.TRANSACTIONAL_ZOOKEEPER_PORT, Config.STORM_ZOOKEEPER_PORT);
- CuratorFramework initter = Utils.newCuratorStarted(conf, servers, port);
- try {
- initter.create().creatingParentsIfNeeded().forPath(rootDir);
- } catch(KeeperException.NodeExistsException e) {
-
- }
-
- initter.close();
-
- _curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void setData(String path, Object obj) {
- path = "/" + path;
- byte[] ser;
- try {
- ser = Utils.to_json(obj).getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- try {
- if(_curator.checkExists().forPath(path)!=null) {
- _curator.setData().forPath(path, ser);
- } else {
- _curator.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.PERSISTENT)
- .forPath(path, ser);
- }
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void delete(String path) {
- path = "/" + path;
- try {
- _curator.delete().forPath(path);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public List<String> list(String path) {
- path = "/" + path;
- try {
- if(_curator.checkExists().forPath(path)==null) {
- return new ArrayList<String>();
- } else {
- return _curator.getChildren().forPath(path);
- }
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void mkdir(String path) {
- setData(path, 7);
- }
-
- public Object getData(String path) {
- path = "/" + path;
- try {
- if(_curator.checkExists().forPath(path)!=null) {
- return Utils.from_json(new String(_curator.getData().forPath(path), "UTF-8"));
- } else {
- return null;
- }
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public void close() {
- _curator.close();
- }
-
- private Object getWithBackup(Map amap, Object primary, Object backup) {
- Object ret = amap.get(primary);
- if(ret==null) return amap.get(backup);
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java b/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java
deleted file mode 100644
index 0221579..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/ComboList.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package storm.trident.tuple;
-
-import java.io.Serializable;
-import java.util.AbstractList;
-import java.util.List;
-import org.apache.commons.lang.builder.ToStringBuilder;
-
-
-public class ComboList extends AbstractList<Object> {
- public static class Factory implements Serializable {
- Pointer[] index;
- int[] sizes;
-
- public Factory(int... sizes) {
- this.sizes = sizes;
- int total = 0;
- for(int size: sizes) {
- total+=size;
- }
- index = new Pointer[total];
- int i=0;
- int j=0;
- for(int size: sizes) {
- for(int z=0; z<size; z++) {
- index[j] = new Pointer(i, z);
- j++;
- }
- i++;
- }
- }
-
- public ComboList create(List[] delegates) {
- if(delegates.length!=sizes.length) {
- throw new RuntimeException("Expected " + sizes.length + " lists, but instead got " + delegates.length + " lists");
- }
- for(int i=0; i<delegates.length; i++) {
- List l = delegates[i];
- if(l==null || l.size() != sizes[i]) {
- throw new RuntimeException("Got unexpected delegates to ComboList: " + ToStringBuilder.reflectionToString(delegates));
- }
- }
- return new ComboList(delegates, index);
- }
- }
-
- private static class Pointer implements Serializable {
- int listIndex;
- int subIndex;
-
- public Pointer(int listIndex, int subIndex) {
- this.listIndex = listIndex;
- this.subIndex = subIndex;
- }
-
- }
-
- Pointer[] _index;
- List[] _delegates;
-
- public ComboList(List[] delegates, Pointer[] index) {
- _index = index;
- _delegates = delegates;
- }
-
- @Override
- public Object get(int i) {
- Pointer ptr = _index[i];
- return _delegates[ptr.listIndex].get(ptr.subIndex);
- }
-
- @Override
- public int size() {
- return _index.length;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java b/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java
deleted file mode 100644
index 72fd3d3..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/ConsList.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package storm.trident.tuple;
-
-import java.util.AbstractList;
-import java.util.List;
-
-public class ConsList extends AbstractList<Object> {
- List<Object> _elems;
- Object _first;
-
- public ConsList(Object o, List<Object> elems) {
- _elems = elems;
- _first = o;
- }
-
- @Override
- public Object get(int i) {
- if(i==0) return _first;
- else {
- return _elems.get(i - 1);
- }
- }
-
- @Override
- public int size() {
- return _elems.size() + 1;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java b/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java
deleted file mode 100644
index 9159ce7..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/TridentTuple.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package storm.trident.tuple;
-
-import backtype.storm.tuple.ITuple;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-public interface TridentTuple extends ITuple, List<Object> {
-
- public static interface Factory extends Serializable {
- Map<String, ValuePointer> getFieldIndex();
- List<String> getOutputFields();
- int numDelegates();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java b/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java
deleted file mode 100644
index 17f3e3f..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/TridentTupleView.java
+++ /dev/null
@@ -1,342 +0,0 @@
-package storm.trident.tuple;
-
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import clojure.lang.IPersistentVector;
-import clojure.lang.PersistentVector;
-import clojure.lang.RT;
-import java.util.AbstractList;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Arrays;
-
-//extends abstractlist so that it can be emitted directly as Storm tuples
-public class TridentTupleView extends AbstractList<Object> implements TridentTuple {
- ValuePointer[] _index;
- Map<String, ValuePointer> _fieldIndex;
- IPersistentVector _delegates;
-
- public static class ProjectionFactory implements Factory {
- Map<String, ValuePointer> _fieldIndex;
- ValuePointer[] _index;
- Factory _parent;
-
- public ProjectionFactory(Factory parent, Fields projectFields) {
- _parent = parent;
- if(projectFields==null) projectFields = new Fields();
- Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();
- _fieldIndex = new HashMap<String, ValuePointer>();
- for(String f: projectFields) {
- _fieldIndex.put(f, parentFieldIndex.get(f));
- }
- _index = ValuePointer.buildIndex(projectFields, _fieldIndex);
- }
-
- public TridentTuple create(TridentTuple parent) {
- if(_index.length==0) return EMPTY_TUPLE;
- else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);
- }
-
- @Override
- public Map<String, ValuePointer> getFieldIndex() {
- return _fieldIndex;
- }
-
- @Override
- public int numDelegates() {
- return _parent.numDelegates();
- }
-
- @Override
- public List<String> getOutputFields() {
- return indexToFieldsList(_index);
- }
- }
-
- public static class FreshOutputFactory implements Factory {
- Map<String, ValuePointer> _fieldIndex;
- ValuePointer[] _index;
-
- public FreshOutputFactory(Fields selfFields) {
- _fieldIndex = new HashMap<String, ValuePointer>();
- for(int i=0; i<selfFields.size(); i++) {
- String field = selfFields.get(i);
- _fieldIndex.put(field, new ValuePointer(0, i, field));
- }
- _index = ValuePointer.buildIndex(selfFields, _fieldIndex);
- }
-
- public TridentTuple create(List<Object> selfVals) {
- return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);
- }
-
- @Override
- public Map<String, ValuePointer> getFieldIndex() {
- return _fieldIndex;
- }
-
- @Override
- public int numDelegates() {
- return 1;
- }
-
- @Override
- public List<String> getOutputFields() {
- return indexToFieldsList(_index);
- }
- }
-
- public static class OperationOutputFactory implements Factory {
- Map<String, ValuePointer> _fieldIndex;
- ValuePointer[] _index;
- Factory _parent;
-
- public OperationOutputFactory(Factory parent, Fields selfFields) {
- _parent = parent;
- _fieldIndex = new HashMap(parent.getFieldIndex());
- int myIndex = parent.numDelegates();
- for(int i=0; i<selfFields.size(); i++) {
- String field = selfFields.get(i);
- _fieldIndex.put(field, new ValuePointer(myIndex, i, field));
- }
- List<String> myOrder = new ArrayList<String>(parent.getOutputFields());
-
- Set<String> parentFieldsSet = new HashSet<String>(myOrder);
- for(String f: selfFields) {
- if(parentFieldsSet.contains(f)) {
- throw new IllegalArgumentException(
- "Additive operations cannot add fields with same name as already exists. "
- + "Tried adding " + selfFields + " to " + parent.getOutputFields());
- }
- myOrder.add(f);
- }
-
- _index = ValuePointer.buildIndex(new Fields(myOrder), _fieldIndex);
- }
-
- public TridentTuple create(TridentTupleView parent, List<Object> selfVals) {
- IPersistentVector curr = parent._delegates;
- curr = (IPersistentVector) RT.conj(curr, selfVals);
- return new TridentTupleView(curr, _index, _fieldIndex);
- }
-
- @Override
- public Map<String, ValuePointer> getFieldIndex() {
- return _fieldIndex;
- }
-
- @Override
- public int numDelegates() {
- return _parent.numDelegates() + 1;
- }
-
- @Override
- public List<String> getOutputFields() {
- return indexToFieldsList(_index);
- }
- }
-
- public static class RootFactory implements Factory {
- ValuePointer[] index;
- Map<String, ValuePointer> fieldIndex;
-
- public RootFactory(Fields inputFields) {
- index = new ValuePointer[inputFields.size()];
- int i=0;
- for(String f: inputFields) {
- index[i] = new ValuePointer(0, i, f);
- i++;
- }
- fieldIndex = ValuePointer.buildFieldIndex(index);
- }
-
- public TridentTuple create(Tuple parent) {
- return new TridentTupleView(PersistentVector.EMPTY.cons(parent.getValues()), index, fieldIndex);
- }
-
- @Override
- public Map<String, ValuePointer> getFieldIndex() {
- return fieldIndex;
- }
-
- @Override
- public int numDelegates() {
- return 1;
- }
-
- @Override
- public List<String> getOutputFields() {
- return indexToFieldsList(this.index);
- }
- }
-
- private static List<String> indexToFieldsList(ValuePointer[] index) {
- List<String> ret = new ArrayList<String>();
- for(ValuePointer p: index) {
- ret.add(p.field);
- }
- return ret;
- }
-
- public static TridentTupleView EMPTY_TUPLE = new TridentTupleView(null, new ValuePointer[0], new HashMap());
-
- // index and fieldIndex are precomputed, delegates built up over many operations using persistent data structures
- public TridentTupleView(IPersistentVector delegates, ValuePointer[] index, Map<String, ValuePointer> fieldIndex) {
- _delegates = delegates;
- _index = index;
- _fieldIndex = fieldIndex;
- }
-
- public static TridentTuple createFreshTuple(Fields fields, List<Object> values) {
- FreshOutputFactory factory = new FreshOutputFactory(fields);
- return factory.create(values);
- }
-
- public static TridentTuple createFreshTuple(Fields fields, Object... values) {
- FreshOutputFactory factory = new FreshOutputFactory(fields);
- return factory.create(Arrays.asList(values));
- }
-
- @Override
- public List<Object> getValues() {
- return this;
- }
-
- @Override
- public int size() {
- return _index.length;
- }
-
- @Override
- public boolean contains(String field) {
- return getFields().contains(field);
- }
-
- @Override
- public Fields getFields() {
- return new Fields(indexToFieldsList(_index));
- }
-
- @Override
- public int fieldIndex(String field) {
- return getFields().fieldIndex(field);
- }
-
- @Override
- public List<Object> select(Fields selector) {
- return getFields().select(selector, getValues());
- }
-
- @Override
- public Object get(int i) {
- return getValue(i);
- }
-
- @Override
- public Object getValue(int i) {
- return getValueByPointer(_index[i]);
- }
-
- @Override
- public String getString(int i) {
- return (String) getValue(i);
- }
-
- @Override
- public Integer getInteger(int i) {
- return (Integer) getValue(i);
- }
-
- @Override
- public Long getLong(int i) {
- return (Long) getValue(i);
- }
-
- @Override
- public Boolean getBoolean(int i) {
- return (Boolean) getValue(i);
- }
-
- @Override
- public Short getShort(int i) {
- return (Short) getValue(i);
- }
-
- @Override
- public Byte getByte(int i) {
- return (Byte) getValue(i);
- }
-
- @Override
- public Double getDouble(int i) {
- return (Double) getValue(i);
- }
-
- @Override
- public Float getFloat(int i) {
- return (Float) getValue(i);
- }
-
- @Override
- public byte[] getBinary(int i) {
- return (byte[]) getValue(i);
- }
-
- @Override
- public Object getValueByField(String field) {
- return getValueByPointer(_fieldIndex.get(field));
- }
-
- @Override
- public String getStringByField(String field) {
- return (String) getValueByField(field);
- }
-
- @Override
- public Integer getIntegerByField(String field) {
- return (Integer) getValueByField(field);
- }
-
- @Override
- public Long getLongByField(String field) {
- return (Long) getValueByField(field);
- }
-
- @Override
- public Boolean getBooleanByField(String field) {
- return (Boolean) getValueByField(field);
- }
-
- @Override
- public Short getShortByField(String field) {
- return (Short) getValueByField(field);
- }
-
- @Override
- public Byte getByteByField(String field) {
- return (Byte) getValueByField(field);
- }
-
- @Override
- public Double getDoubleByField(String field) {
- return (Double) getValueByField(field);
- }
-
- @Override
- public Float getFloatByField(String field) {
- return (Float) getValueByField(field);
- }
-
- @Override
- public byte[] getBinaryByField(String field) {
- return (byte[]) getValueByField(field);
- }
-
- private Object getValueByPointer(ValuePointer ptr) {
- return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java b/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java
deleted file mode 100644
index 401261e..0000000
--- a/jstorm-client/src/main/java/storm/trident/tuple/ValuePointer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package storm.trident.tuple;
-
-import backtype.storm.tuple.Fields;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.lang.builder.ToStringBuilder;
-
-public class ValuePointer {
- public static Map<String, ValuePointer> buildFieldIndex(ValuePointer[] pointers) {
- Map<String, ValuePointer> ret = new HashMap<String, ValuePointer>();
- for(ValuePointer ptr: pointers) {
- ret.put(ptr.field, ptr);
- }
- return ret;
- }
-
- public static ValuePointer[] buildIndex(Fields fieldsOrder, Map<String, ValuePointer> pointers) {
- if(fieldsOrder.size()!=pointers.size()) {
- throw new IllegalArgumentException("Fields order must be same length as pointers map");
- }
- ValuePointer[] ret = new ValuePointer[pointers.size()];
- for(int i=0; i<fieldsOrder.size(); i++) {
- ret[i] = pointers.get(fieldsOrder.get(i));
- }
- return ret;
- }
-
- public int delegateIndex;
- protected int index;
- protected String field;
-
- public ValuePointer(int delegateIndex, int index, String field) {
- this.delegateIndex = delegateIndex;
- this.index = index;
- this.field = field;
- }
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java b/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java
deleted file mode 100644
index 02cff2a..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/ErrorEdgeFactory.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package storm.trident.util;
-
-import java.io.Serializable;
-import org.jgrapht.EdgeFactory;
-
-public class ErrorEdgeFactory implements EdgeFactory, Serializable {
- @Override
- public Object createEdge(Object v, Object v1) {
- throw new RuntimeException("Edges should be made explicitly");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java b/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java
deleted file mode 100644
index 6201978..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/IndexedEdge.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package storm.trident.util;
-
-import java.io.Serializable;
-
-public class IndexedEdge<T> implements Comparable, Serializable {
- public T source;
- public T target;
- public int index;
-
- public IndexedEdge(T source, T target, int index) {
- this.source = source;
- this.target = target;
- this.index = index;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + index;
- result = prime * result + ((source == null) ? 0 : source.hashCode());
- result = prime * result + ((target == null) ? 0 : target.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- IndexedEdge other = (IndexedEdge) obj;
- if (index != other.index)
- return false;
- if (source == null) {
- if (other.source != null)
- return false;
- } else if (!source.equals(other.source))
- return false;
- if (target == null) {
- if (other.target != null)
- return false;
- } else if (!target.equals(other.target))
- return false;
- return true;
- }
-
- @Override
- public int compareTo(Object t) {
- IndexedEdge other = (IndexedEdge) t;
- return index - other.index;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/LRUMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/LRUMap.java b/jstorm-client/src/main/java/storm/trident/util/LRUMap.java
deleted file mode 100644
index 8d1a9a3..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/LRUMap.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package storm.trident.util;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-public class LRUMap<A, B> extends LinkedHashMap<A, B> {
- private int _maxSize;
-
- public LRUMap(int maxSize) {
- super(maxSize + 1, 1.0f, true);
- _maxSize = maxSize;
- }
-
- @Override
- protected boolean removeEldestEntry(final Map.Entry<A, B> eldest) {
- return size() > _maxSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java b/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java
deleted file mode 100644
index 0059721..0000000
--- a/jstorm-client/src/main/java/storm/trident/util/TridentUtils.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package storm.trident.util;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.thrift7.TBase;
-import org.apache.thrift7.TDeserializer;
-import org.apache.thrift7.TException;
-import org.apache.thrift7.TSerializer;
-import org.jgrapht.DirectedGraph;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.topology.IComponent;
-import backtype.storm.topology.OutputFieldsGetter;
-import backtype.storm.tuple.Fields;
-
-public class TridentUtils {
- public static Fields fieldsUnion(Fields... fields) {
- Set<String> ret = new HashSet<String>();
- for(Fields f: fields) {
- if(f!=null) ret.addAll(f.toList());
- }
- return new Fields(new ArrayList<String>(ret));
- }
-
- public static Fields fieldsConcat(Fields... fields) {
- List<String> ret = new ArrayList<String>();
- for(Fields f: fields) {
- if(f!=null) ret.addAll(f.toList());
- }
- return new Fields(ret);
- }
-
- public static Fields fieldsSubtract(Fields all, Fields minus) {
- Set<String> removeSet = new HashSet<String>(minus.toList());
- List<String> toKeep = new ArrayList<String>();
- for(String s: all.toList()) {
- if(!removeSet.contains(s)) {
- toKeep.add(s);
- }
- }
- return new Fields(toKeep);
- }
-
- public static Fields getSingleOutputStreamFields(IComponent component) {
- OutputFieldsGetter getter = new OutputFieldsGetter();
- component.declareOutputFields(getter);
- Map<String, StreamInfo> declaration = getter.getFieldsDeclaration();
- if(declaration.size()!=1) {
- throw new RuntimeException("Trident only supports components that emit a single stream");
- }
- StreamInfo si = declaration.values().iterator().next();
- if(si.is_direct()) {
- throw new RuntimeException("Trident does not support direct streams");
- }
- return new Fields(si.get_output_fields());
- }
-
- /**
- * Assumes edge contains an index
- */
- public static <T> List<T> getParents(DirectedGraph g, T n) {
- List<IndexedEdge> incoming = new ArrayList(g.incomingEdgesOf(n));
- Collections.sort(incoming);
- List<T> ret = new ArrayList();
- for(IndexedEdge e: incoming) {
- ret.add((T)e.source);
- }
- return ret;
- }
-
- public static <T> List<T> getChildren(DirectedGraph g, T n) {
- List<IndexedEdge> outgoing = new ArrayList(g.outgoingEdgesOf(n));
- List<T> ret = new ArrayList();
- for(IndexedEdge e: outgoing) {
- ret.add((T)e.target);
- }
- return ret;
- }
-
-
- public static <T> T getParent(DirectedGraph g, T n) {
- List<T> parents = getParents(g, n);
- if(parents.size()!=1) {
- throw new RuntimeException("Expected a single parent");
- }
- return parents.get(0);
- }
-
- private static ThreadLocal<TSerializer> threadSer = new ThreadLocal<TSerializer>();
- private static ThreadLocal<TDeserializer> threadDes = new ThreadLocal<TDeserializer>();
-
- public static byte[] thriftSerialize(TBase t) {
- try {
- TSerializer ser = threadSer.get();
- if (ser == null) {
- ser = new TSerializer();
- threadSer.set(ser);
- }
- return ser.serialize(t);
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static <T> T thriftDeserialize(Class c, byte[] b) {
- try {
- T ret = (T) c.newInstance();
- TDeserializer des = threadDes.get();
- if (des == null) {
- des = new TDeserializer();
- threadDes.set(des);
- }
- des.deserialize((TBase) ret, b);
- return ret;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/__init__.py
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/__init__.py b/jstorm-client/src/main/py/__init__.py
deleted file mode 100644
index e69de29..0000000
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/storm/DistributedRPC-remote
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/storm/DistributedRPC-remote b/jstorm-client/src/main/py/storm/DistributedRPC-remote
deleted file mode 100644
index 9b7ebd8..0000000
--- a/jstorm-client/src/main/py/storm/DistributedRPC-remote
+++ /dev/null
@@ -1,85 +0,0 @@
-#!/usr/bin/env python
-#
-# Autogenerated by Thrift Compiler (0.7.0)
-#
-# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
-#
-
-import sys
-import pprint
-from urlparse import urlparse
-from thrift.transport import TTransport
-from thrift.transport import TSocket
-from thrift.transport import THttpClient
-from thrift.protocol import TBinaryProtocol
-
-import DistributedRPC
-from ttypes import *
-
-if len(sys.argv) <= 1 or sys.argv[1] == '--help':
- print ''
- print 'Usage: ' + sys.argv[0] + ' [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]'
- print ''
- print 'Functions:'
- print ' string execute(string functionName, string funcArgs)'
- print ''
- sys.exit(0)
-
-pp = pprint.PrettyPrinter(indent = 2)
-host = 'localhost'
-port = 9090
-uri = ''
-framed = False
-http = False
-argi = 1
-
-if sys.argv[argi] == '-h':
- parts = sys.argv[argi+1].split(':')
- host = parts[0]
- port = int(parts[1])
- argi += 2
-
-if sys.argv[argi] == '-u':
- url = urlparse(sys.argv[argi+1])
- parts = url[1].split(':')
- host = parts[0]
- if len(parts) > 1:
- port = int(parts[1])
- else:
- port = 80
- uri = url[2]
- if url[4]:
- uri += '?%s' % url[4]
- http = True
- argi += 2
-
-if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
- framed = True
- argi += 1
-
-cmd = sys.argv[argi]
-args = sys.argv[argi+1:]
-
-if http:
- transport = THttpClient.THttpClient(host, port, uri)
-else:
- socket = TSocket.TSocket(host, port)
- if framed:
- transport = TTransport.TFramedTransport(socket)
- else:
- transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = DistributedRPC.Client(protocol)
-transport.open()
-
-if cmd == 'execute':
- if len(args) != 2:
- print 'execute requires 2 args'
- sys.exit(1)
- pp.pprint(client.execute(args[0],args[1],))
-
-else:
- print 'Unrecognized method %s' % cmd
- sys.exit(1)
-
-transport.close()
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/storm/DistributedRPC.py
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/storm/DistributedRPC.py b/jstorm-client/src/main/py/storm/DistributedRPC.py
deleted file mode 100644
index a7e6ef9..0000000
--- a/jstorm-client/src/main/py/storm/DistributedRPC.py
+++ /dev/null
@@ -1,256 +0,0 @@
-#
-# Autogenerated by Thrift Compiler (0.7.0)
-#
-# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
-#
-
-from thrift.Thrift import *
-from ttypes import *
-from thrift.Thrift import TProcessor
-from thrift.transport import TTransport
-from thrift.protocol import TBinaryProtocol, TProtocol
-try:
- from thrift.protocol import fastbinary
-except:
- fastbinary = None
-
-
-class Iface:
- def execute(self, functionName, funcArgs):
- """
- Parameters:
- - functionName
- - funcArgs
- """
- pass
-
-
-class Client(Iface):
- def __init__(self, iprot, oprot=None):
- self._iprot = self._oprot = iprot
- if oprot is not None:
- self._oprot = oprot
- self._seqid = 0
-
- def execute(self, functionName, funcArgs):
- """
- Parameters:
- - functionName
- - funcArgs
- """
- self.send_execute(functionName, funcArgs)
- return self.recv_execute()
-
- def send_execute(self, functionName, funcArgs):
- self._oprot.writeMessageBegin('execute', TMessageType.CALL, self._seqid)
- args = execute_args()
- args.functionName = functionName
- args.funcArgs = funcArgs
- args.write(self._oprot)
- self._oprot.writeMessageEnd()
- self._oprot.trans.flush()
-
- def recv_execute(self, ):
- (fname, mtype, rseqid) = self._iprot.readMessageBegin()
- if mtype == TMessageType.EXCEPTION:
- x = TApplicationException()
- x.read(self._iprot)
- self._iprot.readMessageEnd()
- raise x
- result = execute_result()
- result.read(self._iprot)
- self._iprot.readMessageEnd()
- if result.success is not None:
- return result.success
- if result.e is not None:
- raise result.e
- raise TApplicationException(TApplicationException.MISSING_RESULT, "execute failed: unknown result");
-
-
-class Processor(Iface, TProcessor):
- def __init__(self, handler):
- self._handler = handler
- self._processMap = {}
- self._processMap["execute"] = Processor.process_execute
-
- def process(self, iprot, oprot):
- (name, type, seqid) = iprot.readMessageBegin()
- if name not in self._processMap:
- iprot.skip(TType.STRUCT)
- iprot.readMessageEnd()
- x = TApplicationException(TApplicationException.UNKNOWN_METHOD, 'Unknown function %s' % (name))
- oprot.writeMessageBegin(name, TMessageType.EXCEPTION, seqid)
- x.write(oprot)
- oprot.writeMessageEnd()
- oprot.trans.flush()
- return
- else:
- self._processMap[name](self, seqid, iprot, oprot)
- return True
-
- def process_execute(self, seqid, iprot, oprot):
- args = execute_args()
- args.read(iprot)
- iprot.readMessageEnd()
- result = execute_result()
- try:
- result.success = self._handler.execute(args.functionName, args.funcArgs)
- except DRPCExecutionException, e:
- result.e = e
- oprot.writeMessageBegin("execute", TMessageType.REPLY, seqid)
- result.write(oprot)
- oprot.writeMessageEnd()
- oprot.trans.flush()
-
-
-# HELPER FUNCTIONS AND STRUCTURES
-
-class execute_args:
- """
- Attributes:
- - functionName
- - funcArgs
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.STRING, 'functionName', None, None, ), # 1
- (2, TType.STRING, 'funcArgs', None, None, ), # 2
- )
-
- def __hash__(self):
- return 0 + hash(self.functionName) + hash(self.funcArgs)
-
- def __init__(self, functionName=None, funcArgs=None,):
- self.functionName = functionName
- self.funcArgs = funcArgs
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.STRING:
- self.functionName = iprot.readString().decode('utf-8')
- else:
- iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.STRING:
- self.funcArgs = iprot.readString().decode('utf-8')
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('execute_args')
- if self.functionName is not None:
- oprot.writeFieldBegin('functionName', TType.STRING, 1)
- oprot.writeString(self.functionName.encode('utf-8'))
- oprot.writeFieldEnd()
- if self.funcArgs is not None:
- oprot.writeFieldBegin('funcArgs', TType.STRING, 2)
- oprot.writeString(self.funcArgs.encode('utf-8'))
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- return
-
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class execute_result:
- """
- Attributes:
- - success
- - e
- """
-
- thrift_spec = (
- (0, TType.STRING, 'success', None, None, ), # 0
- (1, TType.STRUCT, 'e', (DRPCExecutionException, DRPCExecutionException.thrift_spec), None, ), # 1
- )
-
- def __hash__(self):
- return 0 + hash(self.success) + hash(self.e)
-
- def __init__(self, success=None, e=None,):
- self.success = success
- self.e = e
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 0:
- if ftype == TType.STRING:
- self.success = iprot.readString().decode('utf-8')
- else:
- iprot.skip(ftype)
- elif fid == 1:
- if ftype == TType.STRUCT:
- self.e = DRPCExecutionException()
- self.e.read(iprot)
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('execute_result')
- if self.success is not None:
- oprot.writeFieldBegin('success', TType.STRING, 0)
- oprot.writeString(self.success.encode('utf-8'))
- oprot.writeFieldEnd()
- if self.e is not None:
- oprot.writeFieldBegin('e', TType.STRUCT, 1)
- self.e.write(oprot)
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- return
-
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote b/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote
deleted file mode 100644
index 5235dfe..0000000
--- a/jstorm-client/src/main/py/storm/DistributedRPCInvocations-remote
+++ /dev/null
@@ -1,99 +0,0 @@
-#!/usr/bin/env python
-#
-# Autogenerated by Thrift Compiler (0.7.0)
-#
-# DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
-#
-
-import sys
-import pprint
-from urlparse import urlparse
-from thrift.transport import TTransport
-from thrift.transport import TSocket
-from thrift.transport import THttpClient
-from thrift.protocol import TBinaryProtocol
-
-import DistributedRPCInvocations
-from ttypes import *
-
-if len(sys.argv) <= 1 or sys.argv[1] == '--help':
- print ''
- print 'Usage: ' + sys.argv[0] + ' [-h host:port] [-u url] [-f[ramed]] function [arg1 [arg2...]]'
- print ''
- print 'Functions:'
- print ' void result(string id, string result)'
- print ' DRPCRequest fetchRequest(string functionName)'
- print ' void failRequest(string id)'
- print ''
- sys.exit(0)
-
-pp = pprint.PrettyPrinter(indent = 2)
-host = 'localhost'
-port = 9090
-uri = ''
-framed = False
-http = False
-argi = 1
-
-if sys.argv[argi] == '-h':
- parts = sys.argv[argi+1].split(':')
- host = parts[0]
- port = int(parts[1])
- argi += 2
-
-if sys.argv[argi] == '-u':
- url = urlparse(sys.argv[argi+1])
- parts = url[1].split(':')
- host = parts[0]
- if len(parts) > 1:
- port = int(parts[1])
- else:
- port = 80
- uri = url[2]
- if url[4]:
- uri += '?%s' % url[4]
- http = True
- argi += 2
-
-if sys.argv[argi] == '-f' or sys.argv[argi] == '-framed':
- framed = True
- argi += 1
-
-cmd = sys.argv[argi]
-args = sys.argv[argi+1:]
-
-if http:
- transport = THttpClient.THttpClient(host, port, uri)
-else:
- socket = TSocket.TSocket(host, port)
- if framed:
- transport = TTransport.TFramedTransport(socket)
- else:
- transport = TTransport.TBufferedTransport(socket)
-protocol = TBinaryProtocol.TBinaryProtocol(transport)
-client = DistributedRPCInvocations.Client(protocol)
-transport.open()
-
-if cmd == 'result':
- if len(args) != 2:
- print 'result requires 2 args'
- sys.exit(1)
- pp.pprint(client.result(args[0],args[1],))
-
-elif cmd == 'fetchRequest':
- if len(args) != 1:
- print 'fetchRequest requires 1 args'
- sys.exit(1)
- pp.pprint(client.fetchRequest(args[0],))
-
-elif cmd == 'failRequest':
- if len(args) != 1:
- print 'failRequest requires 1 args'
- sys.exit(1)
- pp.pprint(client.failRequest(args[0],))
-
-else:
- print 'Unrecognized method %s' % cmd
- sys.exit(1)
-
-transport.close()