You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:05 UTC
[26/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java b/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java
deleted file mode 100644
index ba79f7d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.task;
-
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.rpc.IShellMetric;
-import backtype.storm.multilang.BoltMsg;
-import backtype.storm.multilang.ShellMsg;
-import backtype.storm.topology.ReportedFailedException;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.ShellProcess;
-import clojure.lang.RT;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-/**
- * A bolt that shells out to another process to process tuples. ShellBolt
- * communicates with that process over stdio using a special protocol. An ~100
- * line library is required to implement that protocol, and adapter libraries
- * currently exist for Ruby and Python.
- *
- * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
- * in the resources directory within the jar submitted to the master.
- * During development/testing on a local machine, that resources directory just
- * needs to be on the classpath.</p>
- *
- * <p>When creating topologies using the Java API, subclass this bolt and implement
- * the IRichBolt interface to create components for the topology that use other languages. For example:
- * </p>
- *
- * <pre>
- * public class MyBolt extends ShellBolt implements IRichBolt {
- * public MyBolt() {
- * super("python", "mybolt.py");
- * }
- *
- * public void declareOutputFields(OutputFieldsDeclarer declarer) {
- * declarer.declare(new Fields("field1", "field2"));
- * }
- * }
- * </pre>
- */
-public class ShellBolt implements IBolt {
- public static final String HEARTBEAT_STREAM_ID = "__heartbeat";
- public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
- Process _subprocess;
- OutputCollector _collector;
- Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
-
- private String[] _command;
- private ShellProcess _process;
- private volatile boolean _running = true;
- private volatile Throwable _exception;
- private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
- private Random _rand;
-
- private Thread _readerThread;
- private Thread _writerThread;
-
- private TopologyContext _context;
-
- private int workerTimeoutMills;
- private ScheduledExecutorService heartBeatExecutorService;
- private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
- private AtomicBoolean sendHeartbeatFlag = new AtomicBoolean(false);
-
- public ShellBolt(ShellComponent component) {
- this(component.get_execution_command(), component.get_script());
- }
-
- public ShellBolt(String... command) {
- _command = command;
- }
-
- public void prepare(Map stormConf, TopologyContext context,
- final OutputCollector collector) {
- Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
- if (maxPending != null) {
- this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
- }
- _rand = new Random();
- _collector = collector;
-
- _context = context;
-
- workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
-
- _process = new ShellProcess(_command);
-
- //subprocesses must send their pid first thing
- Number subpid = _process.launch(stormConf, context);
- LOG.info("Launched subprocess with pid " + subpid);
-
- // reader
- _readerThread = new Thread(new BoltReaderRunnable());
- _readerThread.start();
-
- _writerThread = new Thread(new BoltWriterRunnable());
- _writerThread.start();
-
- heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
- heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
-
- LOG.info("Start checking heartbeat...");
- setHeartbeat();
- }
-
- public void execute(Tuple input) {
- if (_exception != null) {
- throw new RuntimeException(_exception);
- }
-
- //just need an id
- String genId = Long.toString(_rand.nextLong());
- _inputs.put(genId, input);
- try {
- BoltMsg boltMsg = createBoltMessage(input, genId);
-
- _pendingWrites.put(boltMsg);
- } catch(InterruptedException e) {
- String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
- throw new RuntimeException("Error during multilang processing " + processInfo, e);
- }
- }
-
- private BoltMsg createBoltMessage(Tuple input, String genId) {
- BoltMsg boltMsg = new BoltMsg();
- boltMsg.setId(genId);
- boltMsg.setComp(input.getSourceComponent());
- boltMsg.setStream(input.getSourceStreamId());
- boltMsg.setTask(input.getSourceTask());
- boltMsg.setTuple(input.getValues());
- return boltMsg;
- }
-
- public void cleanup() {
- _running = false;
- heartBeatExecutorService.shutdownNow();
- _writerThread.interrupt();
- _readerThread.interrupt();
- _process.destroy();
- _inputs.clear();
- }
-
- private void handleAck(Object id) {
- Tuple acked = _inputs.remove(id);
- if(acked==null) {
- throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
- }
- _collector.ack(acked);
- }
-
- private void handleFail(Object id) {
- Tuple failed = _inputs.remove(id);
- if(failed==null) {
- throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
- }
- _collector.fail(failed);
- }
-
- private void handleError(String msg) {
- _collector.reportError(new Exception("Shell Process Exception: " + msg));
- }
-
- private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
- List<Tuple> anchors = new ArrayList<Tuple>();
- List<String> recvAnchors = shellMsg.getAnchors();
- if (recvAnchors != null) {
- for (String anchor : recvAnchors) {
- Tuple t = _inputs.get(anchor);
- if (t == null) {
- throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
- }
- anchors.add(t);
- }
- }
-
- if(shellMsg.getTask() == 0) {
- List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
- if (shellMsg.areTaskIdsNeeded()) {
- _pendingWrites.put(outtasks);
- }
- } else {
- _collector.emitDirect((int) shellMsg.getTask(),
- shellMsg.getStream(), anchors, shellMsg.getTuple());
- }
- }
-
- private void handleLog(ShellMsg shellMsg) {
- String msg = shellMsg.getMsg();
- msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
- ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
-
- switch (logLevel) {
- case TRACE:
- LOG.trace(msg);
- break;
- case DEBUG:
- LOG.debug(msg);
- break;
- case INFO:
- LOG.info(msg);
- break;
- case WARN:
- LOG.warn(msg);
- break;
- case ERROR:
- LOG.error(msg);
- _collector.reportError(new ReportedFailedException(msg));
- break;
- default:
- LOG.info(msg);
- break;
- }
- }
-
- private void handleMetrics(ShellMsg shellMsg) {
- //get metric name
- String name = shellMsg.getMetricName();
- if (name.isEmpty()) {
- throw new RuntimeException("Receive Metrics name is empty");
- }
-
- //get metric by name
- IMetric iMetric = _context.getRegisteredMetricByName(name);
- if (iMetric == null) {
- throw new RuntimeException("Could not find metric by name["+name+"] ");
- }
- if ( !(iMetric instanceof IShellMetric)) {
- throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
- }
- IShellMetric iShellMetric = (IShellMetric)iMetric;
-
- //call updateMetricFromRPC with params
- Object paramsObj = shellMsg.getMetricParams();
- try {
- iShellMetric.updateMetricFromRPC(paramsObj);
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void setHeartbeat() {
- lastHeartbeatTimestamp.set(System.currentTimeMillis());
- }
-
- private long getLastHeartbeat() {
- return lastHeartbeatTimestamp.get();
- }
-
- private void die(Throwable exception) {
- String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
- _exception = new RuntimeException(processInfo, exception);
- LOG.error("Halting process: ShellBolt died.", exception);
- _collector.reportError(exception);
- if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
- System.exit(11);
- }
- }
-
- private class BoltHeartbeatTimerTask extends TimerTask {
- private ShellBolt bolt;
-
- public BoltHeartbeatTimerTask(ShellBolt bolt) {
- this.bolt = bolt;
- }
-
- @Override
- public void run() {
- long currentTimeMillis = System.currentTimeMillis();
- long lastHeartbeat = getLastHeartbeat();
-
- LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
- currentTimeMillis, lastHeartbeat, workerTimeoutMills);
-
- if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
- bolt.die(new RuntimeException("subprocess heartbeat timeout"));
- }
-
- sendHeartbeatFlag.compareAndSet(false, true);
- }
-
-
- }
-
- private class BoltReaderRunnable implements Runnable {
- public void run() {
- while (_running) {
- try {
- ShellMsg shellMsg = _process.readShellMsg();
-
- String command = shellMsg.getCommand();
- if (command == null) {
- throw new IllegalArgumentException("Command not found in bolt message: " + shellMsg);
- }
- if (command.equals("sync")) {
- setHeartbeat();
- } else if(command.equals("ack")) {
- handleAck(shellMsg.getId());
- } else if (command.equals("fail")) {
- handleFail(shellMsg.getId());
- } else if (command.equals("error")) {
- handleError(shellMsg.getMsg());
- } else if (command.equals("log")) {
- handleLog(shellMsg);
- } else if (command.equals("emit")) {
- handleEmit(shellMsg);
- } else if (command.equals("metrics")) {
- handleMetrics(shellMsg);
- }
- } catch (InterruptedException e) {
- } catch (Throwable t) {
- die(t);
- }
- }
- }
- }
-
- private class BoltWriterRunnable implements Runnable {
- public void run() {
- while (_running) {
- try {
- if (sendHeartbeatFlag.get()) {
- LOG.debug("BOLT - sending heartbeat request to subprocess");
-
- String genId = Long.toString(_rand.nextLong());
- _process.writeBoltMsg(createHeartbeatBoltMessage(genId));
- sendHeartbeatFlag.compareAndSet(true, false);
- }
-
- Object write = _pendingWrites.poll(1, SECONDS);
- if (write instanceof BoltMsg) {
- _process.writeBoltMsg((BoltMsg) write);
- } else if (write instanceof List<?>) {
- _process.writeTaskIds((List<Integer>)write);
- } else if (write != null) {
- throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
- }
- } catch (InterruptedException e) {
- } catch (Throwable t) {
- die(t);
- }
- }
- }
-
- private BoltMsg createHeartbeatBoltMessage(String genId) {
- BoltMsg msg = new BoltMsg();
- msg.setId(genId);
- msg.setTask(Constants.SYSTEM_TASK_ID);
- msg.setStream(HEARTBEAT_STREAM_ID);
- msg.setTuple(new ArrayList<Object>());
- return msg;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java b/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java
deleted file mode 100644
index 34ef4fa..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang.NotImplementedException;
-
-/**
- * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
- * methods, respectively. This object provides information about the component's
- * place within the topology, such as task ids, inputs and outputs, etc.
- *
- * <p>
- * The TopologyContext is also used to declare ISubscribedState objects to
- * synchronize state with StateSpouts this object is subscribed to.
- * </p>
- */
-public class TopologyContext extends WorkerTopologyContext implements
- IMetricsContext {
- private Integer _taskId;
- private Map<String, Object> _taskData = new HashMap<String, Object>();
- private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
- private Map<String, Object> _executorData;
- private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;
- private clojure.lang.Atom _openOrPrepareWasCalled;
-
- public TopologyContext(StormTopology topology, Map stormConf,
- Map<Integer, String> taskToComponent,
- Map<String, List<Integer>> componentToSortedTasks,
- Map<String, Map<String, Fields>> componentToStreamToFields,
- String topologyId, String codeDir, String pidDir, Integer taskId,
- Integer workerPort, List<Integer> workerTasks,
- Map<String, Object> defaultResources,
- Map<String, Object> userResources,
- Map<String, Object> executorData, Map registeredMetrics,
- clojure.lang.Atom openOrPrepareWasCalled) {
- super(topology, stormConf, taskToComponent, componentToSortedTasks,
- componentToStreamToFields, topologyId, codeDir, pidDir,
- workerPort, workerTasks, defaultResources, userResources);
- _taskId = taskId;
- _executorData = executorData;
- _registeredMetrics = registeredMetrics;
- _openOrPrepareWasCalled = openOrPrepareWasCalled;
- }
-
- /**
- * All state from all subscribed state spouts streams will be synced with
- * the provided object.
- *
- * <p>
- * It is recommended that your ISubscribedState object is kept as an
- * instance variable of this object. The recommended usage of this method is
- * as follows:
- * </p>
- *
- * <p>
- * _myState = context.setAllSubscribedState(new MyState());
- * </p>
- *
- * @param obj
- * Provided ISubscribedState implementation
- * @return Returns the ISubscribedState object provided
- */
- public <T extends ISubscribedState> T setAllSubscribedState(T obj) {
- // check that only subscribed to one component/stream for statespout
- // setsubscribedstate appropriately
- throw new NotImplementedException();
- }
-
- /**
- * Synchronizes the default stream from the specified state spout component
- * id with the provided ISubscribedState object.
- *
- * <p>
- * The recommended usage of this method is as follows:
- * </p>
- * <p>
- * _myState = context.setSubscribedState(componentId, new MyState());
- * </p>
- *
- * @param componentId
- * the id of the StateSpout component to subscribe to
- * @param obj
- * Provided ISubscribedState implementation
- * @return Returns the ISubscribedState object provided
- */
- public <T extends ISubscribedState> T setSubscribedState(
- String componentId, T obj) {
- return setSubscribedState(componentId, Utils.DEFAULT_STREAM_ID, obj);
- }
-
- /**
- * Synchronizes the specified stream from the specified state spout
- * component id with the provided ISubscribedState object.
- *
- * <p>
- * The recommended usage of this method is as follows:
- * </p>
- * <p>
- * _myState = context.setSubscribedState(componentId, streamId, new
- * MyState());
- * </p>
- *
- * @param componentId
- * the id of the StateSpout component to subscribe to
- * @param streamId
- * the stream to subscribe to
- * @param obj
- * Provided ISubscribedState implementation
- * @return Returns the ISubscribedState object provided
- */
- public <T extends ISubscribedState> T setSubscribedState(
- String componentId, String streamId, T obj) {
- throw new NotImplementedException();
- }
-
- /**
- * Gets the task id of this task.
- *
- * @return the task id
- */
- public int getThisTaskId() {
- return _taskId;
- }
-
- /**
- * Gets the component id for this task. The component id maps to a component
- * id specified for a Spout or Bolt in the topology definition.
- *
- * @return
- */
- public String getThisComponentId() {
- return getComponentId(_taskId);
- }
-
- /**
- * Gets the declared output fields for the specified stream id for the
- * component this task is a part of.
- */
- public Fields getThisOutputFields(String streamId) {
- return getComponentOutputFields(getThisComponentId(), streamId);
- }
-
- /**
- * Gets the set of streams declared for the component of this task.
- */
- public Set<String> getThisStreams() {
- return getComponentStreams(getThisComponentId());
- }
-
- /**
- * Gets the index of this task id in
- * getComponentTasks(getThisComponentId()). An example use case for this
- * method is determining which task accesses which resource in a distributed
- * resource to ensure an even distribution.
- */
- public int getThisTaskIndex() {
- List<Integer> tasks = new ArrayList<Integer>(
- getComponentTasks(getThisComponentId()));
- Collections.sort(tasks);
- for (int i = 0; i < tasks.size(); i++) {
- if (tasks.get(i) == getThisTaskId()) {
- return i;
- }
- }
- throw new RuntimeException(
- "Fatal: could not find this task id in this component");
- }
-
- /**
- * Gets the declared inputs to this component.
- *
- * @return A map from subscribed component/stream to the grouping subscribed
- * with.
- */
- public Map<GlobalStreamId, Grouping> getThisSources() {
- return getSources(getThisComponentId());
- }
-
- /**
- * Gets information about who is consuming the outputs of this component,
- * and how.
- *
- * @return Map from stream id to component id to the Grouping used.
- */
- public Map<String, Map<String, Grouping>> getThisTargets() {
- return getTargets(getThisComponentId());
- }
-
- public void setTaskData(String name, Object data) {
- _taskData.put(name, data);
- }
-
- public Object getTaskData(String name) {
- return _taskData.get(name);
- }
-
- public void setExecutorData(String name, Object data) {
- _executorData.put(name, data);
- }
-
- public Object getExecutorData(String name) {
- return _executorData.get(name);
- }
-
- public void addTaskHook(ITaskHook hook) {
- hook.prepare(_stormConf, this);
- _hooks.add(hook);
- }
-
- public Collection<ITaskHook> getHooks() {
- return _hooks;
- }
-
- /*
- * Register a IMetric instance. Storm will then call getValueAndReset on the
- * metric every timeBucketSizeInSecs and the returned value is sent to all
- * metrics consumers. You must call this during IBolt::prepare or
- * ISpout::open.
- *
- * @return The IMetric argument unchanged.
- */
- public <T extends IMetric> T registerMetric(String name, T metric,
- int timeBucketSizeInSecs) {
- if ((Boolean) _openOrPrepareWasCalled.deref() == true) {
- throw new RuntimeException(
- "TopologyContext.registerMetric can only be called from within overridden "
- + "IBolt::prepare() or ISpout::open() method.");
- }
-
- if (metric == null) {
- throw new IllegalArgumentException("Cannot register a null metric");
- }
-
- if (timeBucketSizeInSecs <= 0) {
- throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
- "greater than or equal to 1 second.");
- }
-
- if (getRegisteredMetricByName(name) != null) {
- throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
- }
-
- Map m1 = _registeredMetrics;
- if (!m1.containsKey(timeBucketSizeInSecs)) {
- m1.put(timeBucketSizeInSecs, new HashMap());
- }
-
- Map m2 = (Map) m1.get(timeBucketSizeInSecs);
- if (!m2.containsKey(_taskId)) {
- m2.put(_taskId, new HashMap());
- }
-
- Map m3 = (Map) m2.get(_taskId);
- if (m3.containsKey(name)) {
- throw new RuntimeException("The same metric name `" + name
- + "` was registered twice.");
- } else {
- m3.put(name, metric);
- }
-
- return metric;
- }
-
- /**
- * Get component's metric from registered metrics by name.
- * Notice: Normally, one component can only register one metric name once.
- * But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254)
- * cause the same metric name can register twice.
- * So we just return the first metric we meet.
- */
- public IMetric getRegisteredMetricByName(String name) {
- IMetric metric = null;
-
- for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
- Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
- if (nameToMetric != null) {
- metric = nameToMetric.get(name);
- if (metric != null) {
- //we just return the first metric we meet
- break;
- }
- }
- }
-
- return metric;
- }
-
- /*
- * Convinience method for registering ReducedMetric.
- */
- public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
- return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
- }
- /*
- * Convinience method for registering CombinedMetric.
- */
- public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
- return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java b/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java
deleted file mode 100644
index 42e88dc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-public class WorkerTopologyContext extends GeneralTopologyContext {
- public static final String SHARED_EXECUTOR = "executor";
-
- private Integer _workerPort;
- private List<Integer> _workerTasks;
- private String _codeDir;
- private String _pidDir;
- Map<String, Object> _userResources;
- Map<String, Object> _defaultResources;
-
- public WorkerTopologyContext(StormTopology topology, Map stormConf,
- Map<Integer, String> taskToComponent,
- Map<String, List<Integer>> componentToSortedTasks,
- Map<String, Map<String, Fields>> componentToStreamToFields,
- String topologyId, String codeDir, String pidDir, Integer workerPort,
- List<Integer> workerTasks, Map<String, Object> defaultResources,
- Map<String, Object> userResources) {
- super(topology, stormConf, taskToComponent, componentToSortedTasks,
- componentToStreamToFields, topologyId);
- _codeDir = codeDir;
- _defaultResources = defaultResources;
- _userResources = userResources;
- try {
- if (pidDir != null) {
- _pidDir = new File(pidDir).getCanonicalPath();
- } else {
- _pidDir = null;
- }
- } catch (IOException e) {
- throw new RuntimeException("Could not get canonical path for "
- + _pidDir, e);
- }
- _workerPort = workerPort;
- _workerTasks = workerTasks;
- }
-
- /**
- * Gets all the task ids that are running in this worker process (including
- * the task for this task).
- */
- public List<Integer> getThisWorkerTasks() {
- return _workerTasks;
- }
-
- public Integer getThisWorkerPort() {
- return _workerPort;
- }
-
- /**
- * Gets the location of the external resources for this worker on the local
- * filesystem. These external resources typically include bolts implemented
- * in other languages, such as Ruby or Python.
- */
- public String getCodeDir() {
- return _codeDir;
- }
-
- /**
- * If this task spawns any subprocesses, those subprocesses must immediately
- * write their PID to this directory on the local filesystem to ensure that
- * Storm properly destroys that process when the worker is shutdown.
- */
- public String getPIDDir() {
- return _pidDir;
- }
-
- public Object getResource(String name) {
- return _userResources.get(name);
- }
-
- public ExecutorService getSharedExecutor() {
- return (ExecutorService) _defaultResources.get(SHARED_EXECUTOR);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java b/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java
deleted file mode 100644
index 131dee7..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.testing;
-
-import java.io.Serializable;
-
-public interface AckFailDelegate extends Serializable {
- public void ack(Object id);
-
- public void fail(Object id);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java
deleted file mode 100644
index 68d334d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.utils.RegisteredGlobalState;
-import java.util.HashSet;
-import java.util.Set;
-
-public class AckFailMapTracker implements AckFailDelegate {
-
- String _acked;
- String _failed;
-
- public AckFailMapTracker() {
- _acked = RegisteredGlobalState.registerState(new HashSet());
- _failed = RegisteredGlobalState.registerState(new HashSet());
- }
-
- public boolean isAcked(Object id) {
- return ((Set) RegisteredGlobalState.getState(_acked)).contains(id);
- }
-
- public boolean isFailed(Object id) {
- return ((Set) RegisteredGlobalState.getState(_failed)).contains(id);
- }
-
- @Override
- public void ack(Object id) {
- ((Set) RegisteredGlobalState.getState(_acked)).add(id);
- }
-
- @Override
- public void fail(Object id) {
- ((Set) RegisteredGlobalState.getState(_failed)).add(id);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java
deleted file mode 100644
index 134f8f0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class AckTracker implements AckFailDelegate {
- private static Map<String, AtomicInteger> acks = new ConcurrentHashMap<String, AtomicInteger>();
-
- private String _id;
-
- public AckTracker() {
- _id = UUID.randomUUID().toString();
- acks.put(_id, new AtomicInteger(0));
- }
-
- @Override
- public void ack(Object id) {
- acks.get(_id).incrementAndGet();
- }
-
- @Override
- public void fail(Object id) {
- }
-
- public int getNumAcks() {
- return acks.get(_id).intValue();
- }
-
- public void resetNumAcks() {
- acks.get(_id).set(0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java b/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java
deleted file mode 100644
index dd6530e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class BatchNumberList extends BaseBatchBolt {
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "list"));
- }
-
- String _wordComponent;
-
- public BatchNumberList(String wordComponent) {
- _wordComponent = wordComponent;
- }
-
- String word = null;
- List<Integer> intSet = new ArrayList<Integer>();
- BatchOutputCollector _collector;
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- BatchOutputCollector collector, Object id) {
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- if (tuple.getSourceComponent().equals(_wordComponent)) {
- this.word = tuple.getString(1);
- } else {
- intSet.add(tuple.getInteger(1));
- }
- }
-
- @Override
- public void finishBatch() {
- if (word != null) {
- Collections.sort(intSet);
- _collector.emit(new Values(word, intSet));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java b/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java
deleted file mode 100644
index 21f316f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class BatchProcessWord extends BaseBasicBolt {
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id", "size"));
- }
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- collector.emit(new Values(input.getValue(0), input.getString(1)
- .length()));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java b/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java
deleted file mode 100644
index 9cb06c3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class BatchRepeatA extends BaseBasicBolt {
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- Object id = input.getValue(0);
- String word = input.getString(1);
- for (int i = 0; i < word.length(); i++) {
- if (word.charAt(i) == 'a') {
- collector.emit("multi", new Values(id, word.substring(0, i)));
- }
- }
- collector.emit("single", new Values(id, word));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declareStream("multi", new Fields("id", "word"));
- declarer.declareStream("single", new Fields("id", "word"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java
deleted file mode 100644
index fb928c8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.HashMap;
-import java.util.Map;
-
-public class BoltTracker extends NonRichBoltTracker implements IRichBolt {
- IRichBolt _richDelegate;
-
- public BoltTracker(IRichBolt delegate, String id) {
- super(delegate, id);
- _richDelegate = delegate;
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _richDelegate.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return new HashMap<String, Object>();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java b/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
deleted file mode 100644
index 4017ddb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-
-/**
- * The param class for the <code>Testing.completeTopology</code>.
- */
-public class CompleteTopologyParam {
- /**
- * The mocked spout sources
- */
- private MockedSources mockedSources;
- /**
- * the config for the topology when it was submitted to the cluster
- */
- private Config stormConf;
- /**
- * whether cleanup the state?
- */
- private Boolean cleanupState;
- /**
- * the topology name you want to submit to the cluster
- */
- private String topologyName;
-
- public MockedSources getMockedSources() {
- return mockedSources;
- }
-
- public void setMockedSources(MockedSources mockedSources) {
- this.mockedSources = mockedSources;
- }
-
- public Config getStormConf() {
- return stormConf;
- }
-
- public void setStormConf(Config stormConf) {
- this.stormConf = stormConf;
- }
-
- public Boolean getCleanupState() {
- return cleanupState;
- }
-
- public void setCleanupState(Boolean cleanupState) {
- this.cleanupState = cleanupState;
- }
-
- public String getTopologyName() {
- return topologyName;
- }
-
- public void setTopologyName(String topologyName) {
- this.topologyName = topologyName;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java
deleted file mode 100644
index 75dca1a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.Map;
-
-public class CountingBatchBolt extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _count++;
- }
-
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tx", "count"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java
deleted file mode 100644
index e3533a6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.topology.base.BaseTransactionalBolt;
-import backtype.storm.transactional.ICommitter;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.Map;
-
-public class CountingCommitBolt extends BaseTransactionalBolt implements
- ICommitter {
- BatchOutputCollector _collector;
- TransactionAttempt _id;
- int _count = 0;
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- BatchOutputCollector collector, TransactionAttempt id) {
- _id = id;
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple tuple) {
- _count++;
- }
-
- @Override
- public void finishBatch() {
- _collector.emit(new Values(_id, _count));
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tx", "count"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java
deleted file mode 100644
index 871e573..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.Map;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.InprocMessaging;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-
-public class FeederSpout extends BaseRichSpout {
- private int _id;
- private Fields _outFields;
- private SpoutOutputCollector _collector;
- private AckFailDelegate _ackFailDelegate;
-
- public FeederSpout(Fields outFields) {
- _id = InprocMessaging.acquireNewPort();
- _outFields = outFields;
- }
-
- public void setAckFailDelegate(AckFailDelegate d) {
- _ackFailDelegate = d;
- }
-
- public void feed(List<Object> tuple) {
- feed(tuple, UUID.randomUUID().toString());
- }
-
- public void feed(List<Object> tuple, Object msgId) {
- InprocMessaging.sendMessage(_id, new Values(tuple, msgId));
- }
-
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- _collector = collector;
- }
-
- public void close() {
-
- }
-
- public void nextTuple() {
- List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
- if (toEmit != null) {
- List<Object> tuple = (List<Object>) toEmit.get(0);
- Object msgId = toEmit.get(1);
-
- _collector.emit(tuple, msgId);
- } else {
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- public void ack(Object msgId) {
- if (_ackFailDelegate != null) {
- _ackFailDelegate.ack(msgId);
- }
- }
-
- public void fail(Object msgId) {
- if (_ackFailDelegate != null) {
- _ackFailDelegate.fail(msgId);
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(_outFields);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return new HashMap<String, Object>();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java b/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java
deleted file mode 100644
index e4cc089..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.utils.Utils;
-import java.io.Serializable;
-import java.util.List;
-
-public class FixedTuple implements Serializable {
- public String stream;
- public List<Object> values;
-
- public FixedTuple(List<Object> values) {
- this.stream = Utils.DEFAULT_STREAM_ID;
- this.values = values;
- }
-
- public FixedTuple(String stream, List<Object> values) {
- this.stream = stream;
- this.values = values;
- }
-
- @Override
- public String toString() {
- return stream + ":" + "<" + values.toString() + ">";
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java
deleted file mode 100644
index e463df0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import static backtype.storm.utils.Utils.get;
-
-public class FixedTupleSpout implements IRichSpout {
- private static final Map<String, Integer> acked = new HashMap<String, Integer>();
- private static final Map<String, Integer> failed = new HashMap<String, Integer>();
-
- public static int getNumAcked(String stormId) {
- synchronized (acked) {
- return get(acked, stormId, 0);
- }
- }
-
- public static int getNumFailed(String stormId) {
- synchronized (failed) {
- return get(failed, stormId, 0);
- }
- }
-
- public static void clear(String stormId) {
- acked.remove(stormId);
- failed.remove(stormId);
- }
-
- private List<FixedTuple> _tuples;
- private SpoutOutputCollector _collector;
-
- private TopologyContext _context;
- private List<FixedTuple> _serveTuples;
- private Map<String, FixedTuple> _pending;
-
- private String _id;
- private String _fieldName;
-
- public FixedTupleSpout(List tuples) {
- this(tuples, null);
- }
-
- public FixedTupleSpout(List tuples, String fieldName) {
- _id = UUID.randomUUID().toString();
- synchronized (acked) {
- acked.put(_id, 0);
- }
- synchronized (failed) {
- failed.put(_id, 0);
- }
- _tuples = new ArrayList<FixedTuple>();
- for (Object o : tuples) {
- FixedTuple ft;
- if (o instanceof FixedTuple) {
- ft = (FixedTuple) o;
- } else {
- ft = new FixedTuple((List) o);
- }
- _tuples.add(ft);
- }
- _fieldName = fieldName;
- }
-
- public List<FixedTuple> getSourceTuples() {
- return _tuples;
- }
-
- public int getCompleted() {
- int ackedAmt;
- int failedAmt;
-
- synchronized (acked) {
- ackedAmt = acked.get(_id);
- }
- synchronized (failed) {
- failedAmt = failed.get(_id);
- }
- return ackedAmt + failedAmt;
- }
-
- public void cleanup() {
- synchronized (acked) {
- acked.remove(_id);
- }
- synchronized (failed) {
- failed.remove(_id);
- }
- }
-
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- _context = context;
- List<Integer> tasks = context.getComponentTasks(context
- .getThisComponentId());
- int startIndex;
- for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
- if (tasks.get(startIndex) == context.getThisTaskId()) {
- break;
- }
- }
- _collector = collector;
- _pending = new HashMap<String, FixedTuple>();
- _serveTuples = new ArrayList<FixedTuple>();
- for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
- _serveTuples.add(_tuples.get(i));
- }
- }
-
- public void close() {
- }
-
- public void nextTuple() {
- if (_serveTuples.size() > 0) {
- FixedTuple ft = _serveTuples.remove(0);
- String id = UUID.randomUUID().toString();
- _pending.put(id, ft);
- _collector.emit(ft.stream, ft.values, id);
- } else {
- Utils.sleep(100);
- }
- }
-
- public void ack(Object msgId) {
- synchronized (acked) {
- int curr = get(acked, _id, 0);
- acked.put(_id, curr + 1);
- }
- }
-
- public void fail(Object msgId) {
- synchronized (failed) {
- int curr = get(failed, _id, 0);
- failed.put(_id, curr + 1);
- }
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void deactivate() {
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- if (_fieldName != null) {
- declarer.declare(new Fields(_fieldName));
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java
deleted file mode 100644
index b3f8d87..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-public class IdentityBolt extends BaseBasicBolt {
- Fields _fields;
-
- public IdentityBolt(Fields fields) {
- _fields = fields;
- }
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- collector.emit(input.getValues());
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(_fields);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
deleted file mode 100644
index 58ae380..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KeyedCountingBatchBolt extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- Map<Object, Integer> _counts = new HashMap<Object, Integer>();
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
-
- @Override
- public void execute(Tuple tuple) {
- Object key = tuple.getValue(1);
- int curr = Utils.get(_counts, key, 0);
- _counts.put(key, curr + 1);
- }
-
- @Override
- public void finishBatch() {
- for (Object key : _counts.keySet()) {
- _collector.emit(new Values(_id, key, _counts.get(key)));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tx", "key", "count"));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java
deleted file mode 100644
index a170130..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.transactional.ICommitter;
-
-public class KeyedCountingCommitterBolt extends KeyedCountingBatchBolt
- implements ICommitter {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
deleted file mode 100644
index c12a319..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import clojure.lang.Numbers;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KeyedSummingBatchBolt extends BaseBatchBolt {
- BatchOutputCollector _collector;
- Object _id;
- Map<Object, Number> _sums = new HashMap<Object, Number>();
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- BatchOutputCollector collector, Object id) {
- _collector = collector;
- _id = id;
- }
-
- @Override
- public void execute(Tuple tuple) {
- Object key = tuple.getValue(1);
- Number curr = Utils.get(_sums, key, 0);
- _sums.put(key, Numbers.add(curr, tuple.getValue(2)));
- }
-
- @Override
- public void finishBatch() {
- for (Object key : _sums.keySet()) {
- _collector.emit(new Values(_id, key, _sums.get(key)));
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("tx", "key", "sum"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
deleted file mode 100644
index bf6286f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout.Emitter;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.RegisteredGlobalState;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MemoryTransactionalSpout implements
- IPartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
- public static String TX_FIELD = MemoryTransactionalSpout.class.getName()
- + "/id";
-
- private String _id;
- private String _finishedPartitionsId;
- private int _takeAmt;
- private Fields _outFields;
- private Map<Integer, List<List<Object>>> _initialPartitions;
-
- public MemoryTransactionalSpout(
- Map<Integer, List<List<Object>>> partitions, Fields outFields,
- int takeAmt) {
- _id = RegisteredGlobalState.registerState(partitions);
- Map<Integer, Boolean> finished = Collections
- .synchronizedMap(new HashMap<Integer, Boolean>());
- _finishedPartitionsId = RegisteredGlobalState.registerState(finished);
- _takeAmt = takeAmt;
- _outFields = outFields;
- _initialPartitions = partitions;
- }
-
- public boolean isExhaustedTuples() {
- Map<Integer, Boolean> statuses = getFinishedStatuses();
- for (Integer partition : getQueues().keySet()) {
- if (!statuses.containsKey(partition)
- || !getFinishedStatuses().get(partition)) {
- return false;
- }
- }
- return true;
- }
-
- class Coordinator implements IPartitionedTransactionalSpout.Coordinator {
-
- @Override
- public int numPartitions() {
- return getQueues().size();
- }
-
- @Override
- public boolean isReady() {
- return true;
- }
-
- @Override
- public void close() {
- }
- }
-
- class Emitter
- implements
- IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
- Integer _maxSpoutPending;
- Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-
- public Emitter(Map conf) {
- Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
- if (c == null)
- _maxSpoutPending = 1;
- else
- _maxSpoutPending = Utils.getInt(c);
- }
-
- @Override
- public MemoryTransactionalSpoutMeta emitPartitionBatchNew(
- TransactionAttempt tx, BatchOutputCollector collector,
- int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
- int index;
- if (lastPartitionMeta == null) {
- index = 0;
- } else {
- index = lastPartitionMeta.index + lastPartitionMeta.amt;
- }
- List<List<Object>> queue = getQueues().get(partition);
- int total = queue.size();
- int left = total - index;
- int toTake = Math.min(left, _takeAmt);
-
- MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(
- index, toTake);
- emitPartitionBatch(tx, collector, partition, ret);
- if (toTake == 0) {
- // this is a pretty hacky way to determine when all the
- // partitions have been committed
- // wait until we've emitted max-spout-pending empty partitions
- // for the partition
- int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
- _emptyPartitions.put(partition, curr);
- if (curr > _maxSpoutPending) {
- Map<Integer, Boolean> finishedStatuses = getFinishedStatuses();
- // will be null in remote mode
- if (finishedStatuses != null) {
- finishedStatuses.put(partition, true);
- }
- }
- }
- return ret;
- }
-
- @Override
- public void emitPartitionBatch(TransactionAttempt tx,
- BatchOutputCollector collector, int partition,
- MemoryTransactionalSpoutMeta partitionMeta) {
- List<List<Object>> queue = getQueues().get(partition);
- for (int i = partitionMeta.index; i < partitionMeta.index
- + partitionMeta.amt; i++) {
- List<Object> toEmit = new ArrayList<Object>(queue.get(i));
- toEmit.add(0, tx);
- collector.emit(toEmit);
- }
- }
-
- @Override
- public void close() {
- }
- }
-
- @Override
- public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map conf,
- TopologyContext context) {
- return new Coordinator();
- }
-
- @Override
- public IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> getEmitter(
- Map conf, TopologyContext context) {
- return new Emitter(conf);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- List<String> toDeclare = new ArrayList<String>(_outFields.toList());
- toDeclare.add(0, TX_FIELD);
- declarer.declare(new Fields(toDeclare));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
- conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
- return conf;
- }
-
- public void startup() {
- getFinishedStatuses().clear();
- }
-
- public void cleanup() {
- RegisteredGlobalState.clearState(_id);
- RegisteredGlobalState.clearState(_finishedPartitionsId);
- }
-
- private Map<Integer, List<List<Object>>> getQueues() {
- Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState
- .getState(_id);
- if (ret != null)
- return ret;
- else
- return _initialPartitions;
- }
-
- private Map<Integer, Boolean> getFinishedStatuses() {
- return (Map<Integer, Boolean>) RegisteredGlobalState
- .getState(_finishedPartitionsId);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java b/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
deleted file mode 100644
index 4d87a66..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package backtype.storm.testing;
-
-public class MemoryTransactionalSpoutMeta {
- int index;
- int amt;
-
- // for kryo compatibility
- public MemoryTransactionalSpoutMeta() {
-
- }
-
- public MemoryTransactionalSpoutMeta(int index, int amt) {
- this.index = index;
- this.amt = amt;
- }
-
- @Override
- public String toString() {
- return "index: " + index + "; amt: " + amt;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java b/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java
deleted file mode 100644
index 8ec7102..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.Map;
-
-/**
- * The param arg for <code>Testing.withSimulatedTimeCluster</code> and
- * <code>Testing.withTrackedCluster</code>
- */
-public class MkClusterParam {
- /**
- * count of supervisors for the cluster.
- */
- private Integer supervisors;
- /**
- * count of port for each supervisor
- */
- private Integer portsPerSupervisor;
- /**
- * cluster config
- */
- private Map daemonConf;
-
- public Integer getSupervisors() {
- return supervisors;
- }
-
- public void setSupervisors(Integer supervisors) {
- this.supervisors = supervisors;
- }
-
- public Integer getPortsPerSupervisor() {
- return portsPerSupervisor;
- }
-
- public void setPortsPerSupervisor(Integer portsPerSupervisor) {
- this.portsPerSupervisor = portsPerSupervisor;
- }
-
- public Map getDaemonConf() {
- return daemonConf;
- }
-
- public void setDaemonConf(Map daemonConf) {
- this.daemonConf = daemonConf;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java b/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java
deleted file mode 100644
index a98704d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class MkTupleParam {
- private String stream;
- private String component;
- private List<String> fields;
-
- public String getStream() {
- return stream;
- }
-
- public void setStream(String stream) {
- this.stream = stream;
- }
-
- public String getComponent() {
- return component;
- }
-
- public void setComponent(String component) {
- this.component = component;
- }
-
- public List<String> getFields() {
- return fields;
- }
-
- public void setFields(String... fields) {
- this.fields = new ArrayList<String>();
- for (int i = 0; i < fields.length; i++) {
- this.fields.add(fields[i]);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java b/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java
deleted file mode 100644
index 6c61edb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-public class MockedSources {
- /**
- * mocked spout sources for the [spout, stream] pair.
- */
- private Map<String, List<FixedTuple>> data = new HashMap<String, List<FixedTuple>>();
-
- /**
- * add mock data for the spout.
- *
- * @param spoutId
- * the spout to be mocked
- * @param streamId
- * the stream of the spout to be mocked
- * @param objects
- * the mocked data
- */
- public void addMockData(String spoutId, String streamId, Values... valueses) {
- if (!data.containsKey(spoutId)) {
- data.put(spoutId, new ArrayList<FixedTuple>());
- }
-
- List<FixedTuple> tuples = data.get(spoutId);
- for (int i = 0; i < valueses.length; i++) {
- FixedTuple tuple = new FixedTuple(streamId, valueses[i]);
- tuples.add(tuple);
- }
- }
-
- public void addMockData(String spoutId, Values... valueses) {
- this.addMockData(spoutId, Utils.DEFAULT_STREAM_ID, valueses);
- }
-
- public Map<String, List<FixedTuple>> getData() {
- return this.data;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java b/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java
deleted file mode 100644
index 9954b06..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class NGrouping implements CustomStreamGrouping {
- int _n;
- List<Integer> _outTasks;
-
- public NGrouping(int n) {
- _n = n;
- }
-
- @Override
- public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
- List<Integer> targetTasks) {
- targetTasks = new ArrayList<Integer>(targetTasks);
- Collections.sort(targetTasks);
- _outTasks = new ArrayList<Integer>();
- for (int i = 0; i < _n; i++) {
- _outTasks.add(targetTasks.get(i));
- }
- }
-
- @Override
- public List<Integer> chooseTasks(int taskId, List<Object> values) {
- return _outTasks;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
deleted file mode 100644
index c5918ad..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.RegisteredGlobalState;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class NonRichBoltTracker implements IBolt {
- IBolt _delegate;
- String _trackId;
-
- public NonRichBoltTracker(IBolt delegate, String id) {
- _delegate = delegate;
- _trackId = id;
- }
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- _delegate.prepare(stormConf, context, collector);
- }
-
- public void execute(Tuple input) {
- _delegate.execute(input);
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
- ((AtomicInteger) stats.get("processed")).incrementAndGet();
- }
-
- public void cleanup() {
- _delegate.cleanup();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
deleted file mode 100644
index 371c622..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.RegisteredGlobalState;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This spout only works in local mode.
- */
-public class OpaqueMemoryTransactionalSpout implements
- IOpaquePartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
- public static String TX_FIELD = MemoryTransactionalSpout.class.getName()
- + "/id";
-
- private String _id;
- private String _finishedPartitionsId;
- private String _disabledId;
- private int _takeAmt;
- private Fields _outFields;
-
- public OpaqueMemoryTransactionalSpout(
- Map<Integer, List<List<Object>>> partitions, Fields outFields,
- int takeAmt) {
- _id = RegisteredGlobalState.registerState(partitions);
-
- Map<Integer, Boolean> finished = Collections
- .synchronizedMap(new HashMap<Integer, Boolean>());
- _finishedPartitionsId = RegisteredGlobalState.registerState(finished);
-
- Map<Integer, Boolean> disabled = Collections
- .synchronizedMap(new HashMap<Integer, Boolean>());
- _disabledId = RegisteredGlobalState.registerState(disabled);
-
- _takeAmt = takeAmt;
- _outFields = outFields;
- }
-
- public void setDisabled(Integer partition, boolean disabled) {
- getDisabledStatuses().put(partition, disabled);
- }
-
- public boolean isExhaustedTuples() {
- Map<Integer, Boolean> statuses = getFinishedStatuses();
- for (Integer partition : getQueues().keySet()) {
- if (!statuses.containsKey(partition)
- || !getFinishedStatuses().get(partition)) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> getEmitter(
- Map conf, TopologyContext context) {
- return new Emitter(conf);
- }
-
- @Override
- public IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator(
- Map conf, TopologyContext context) {
- return new Coordinator();
- }
-
- class Coordinator implements
- IOpaquePartitionedTransactionalSpout.Coordinator {
- @Override
- public boolean isReady() {
- return true;
- }
-
- @Override
- public void close() {
- }
- }
-
- class Emitter
- implements
- IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
- Integer _maxSpoutPending;
- Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-
- public Emitter(Map conf) {
- Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
- if (c == null)
- _maxSpoutPending = 1;
- else
- _maxSpoutPending = Utils.getInt(c);
- }
-
- @Override
- public MemoryTransactionalSpoutMeta emitPartitionBatch(
- TransactionAttempt tx, BatchOutputCollector collector,
- int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
- if (!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) {
- int index;
- if (lastPartitionMeta == null) {
- index = 0;
- } else {
- index = lastPartitionMeta.index + lastPartitionMeta.amt;
- }
- List<List<Object>> queue = getQueues().get(partition);
- int total = queue.size();
- int left = total - index;
- int toTake = Math.min(left, _takeAmt);
-
- MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(
- index, toTake);
- for (int i = ret.index; i < ret.index + ret.amt; i++) {
- List<Object> toEmit = new ArrayList<Object>(queue.get(i));
- toEmit.add(0, tx);
- collector.emit(toEmit);
- }
- if (toTake == 0) {
- // this is a pretty hacky way to determine when all the
- // partitions have been committed
- // wait until we've emitted max-spout-pending empty
- // partitions for the partition
- int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
- _emptyPartitions.put(partition, curr);
- if (curr > _maxSpoutPending) {
- getFinishedStatuses().put(partition, true);
- }
- }
- return ret;
- } else {
- return null;
- }
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public int numPartitions() {
- return getQueues().size();
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- List<String> toDeclare = new ArrayList<String>(_outFields.toList());
- toDeclare.add(0, TX_FIELD);
- declarer.declare(new Fields(toDeclare));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config conf = new Config();
- conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
- return conf;
- }
-
- public void startup() {
- getFinishedStatuses().clear();
- }
-
- public void cleanup() {
- RegisteredGlobalState.clearState(_id);
- RegisteredGlobalState.clearState(_finishedPartitionsId);
- }
-
- private Map<Integer, List<List<Object>>> getQueues() {
- return (Map<Integer, List<List<Object>>>) RegisteredGlobalState
- .getState(_id);
- }
-
- private Map<Integer, Boolean> getFinishedStatuses() {
- return (Map<Integer, Boolean>) RegisteredGlobalState
- .getState(_finishedPartitionsId);
- }
-
- private Map<Integer, Boolean> getDisabledStatuses() {
- return (Map<Integer, Boolean>) RegisteredGlobalState
- .getState(_disabledId);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
deleted file mode 100644
index 207455b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.List;
-
-public class PrepareBatchBolt extends BaseBasicBolt {
- Fields _outFields;
-
- public PrepareBatchBolt(Fields outFields) {
- _outFields = outFields;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(_outFields);
- }
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- long id = Utils.secureRandomLong();
- List<Object> toEmit = new ArrayList<Object>();
- toEmit.add(id);
- toEmit.addAll(input.getValues());
- collector.emit(toEmit);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java
deleted file mode 100644
index a712ee8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.utils.RegisteredGlobalState;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SpoutTracker extends BaseRichSpout {
- IRichSpout _delegate;
- SpoutTrackOutputCollector _tracker;
- String _trackId;
-
- private class SpoutTrackOutputCollector implements ISpoutOutputCollector {
- public int transferred = 0;
- public int emitted = 0;
- public SpoutOutputCollector _collector;
-
- public SpoutTrackOutputCollector(SpoutOutputCollector collector) {
- _collector = collector;
- }
-
- private void recordSpoutEmit() {
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
- ((AtomicInteger) stats.get("spout-emitted")).incrementAndGet();
-
- }
-
- public List<Integer> emit(String streamId, List<Object> tuple,
- Object messageId) {
- List<Integer> ret = _collector.emit(streamId, tuple, messageId);
- recordSpoutEmit();
- return ret;
- }
-
- public void emitDirect(int taskId, String streamId, List<Object> tuple,
- Object messageId) {
- _collector.emitDirect(taskId, streamId, tuple, messageId);
- recordSpoutEmit();
- }
-
- @Override
- public void reportError(Throwable error) {
- _collector.reportError(error);
- }
- }
-
- public SpoutTracker(IRichSpout delegate, String trackId) {
- _delegate = delegate;
- _trackId = trackId;
- }
-
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- _tracker = new SpoutTrackOutputCollector(collector);
- _delegate.open(conf, context, new SpoutOutputCollector(_tracker));
- }
-
- public void close() {
- _delegate.close();
- }
-
- public void nextTuple() {
- _delegate.nextTuple();
- }
-
- public void ack(Object msgId) {
- _delegate.ack(msgId);
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
- ((AtomicInteger) stats.get("processed")).incrementAndGet();
- }
-
- public void fail(Object msgId) {
- _delegate.fail(msgId);
- Map stats = (Map) RegisteredGlobalState.getState(_trackId);
- ((AtomicInteger) stats.get("processed")).incrementAndGet();
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _delegate.declareOutputFields(declarer);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java b/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
deleted file mode 100644
index 359fa00..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import java.util.HashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.tuple;
-
-public class TestAggregatesCounter extends BaseRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
-
- Map<String, Integer> _counts;
- OutputCollector _collector;
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- _collector = collector;
- _counts = new HashMap<String, Integer>();
- }
-
- public void execute(Tuple input) {
- String word = (String) input.getValues().get(0);
- int count = (Integer) input.getValues().get(1);
- _counts.put(word, count);
- int globalCount = 0;
- for (String w : _counts.keySet()) {
- globalCount += _counts.get(w);
- }
- _collector.emit(tuple(globalCount));
- _collector.ack(input);
- }
-
- public void cleanup() {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("agg-global"));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java
deleted file mode 100644
index c6fe3d6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.Map;
-
-public class TestConfBolt extends BaseBasicBolt {
- Map<String, Object> _componentConf;
- Map<String, Object> _conf;
-
- public TestConfBolt() {
- this(null);
- }
-
- public TestConfBolt(Map<String, Object> componentConf) {
- _componentConf = componentConf;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context) {
- _conf = conf;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("conf", "value"));
- }
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- String name = input.getString(0);
- collector.emit(new Values(name, _conf.get(name)));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return _componentConf;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java b/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java
deleted file mode 100644
index 9f9e421..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestGlobalCount extends BaseRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
-
- private int _count;
- OutputCollector _collector;
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- _collector = collector;
- _count = 0;
- }
-
- public void execute(Tuple input) {
- _count++;
- _collector.emit(input, new Values(_count));
- _collector.ack(input);
- }
-
- public void cleanup() {
-
- }
-
- public Fields getOutputFields() {
- return new Fields("global-count");
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("global-count"));
- }
-}