You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:05 UTC

[26/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java b/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java
deleted file mode 100644
index ba79f7d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/ShellBolt.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.task;
-
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.rpc.IShellMetric;
-import backtype.storm.multilang.BoltMsg;
-import backtype.storm.multilang.ShellMsg;
-import backtype.storm.topology.ReportedFailedException;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.ShellProcess;
-import clojure.lang.RT;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static java.util.concurrent.TimeUnit.SECONDS;
-
-/**
- * A bolt that shells out to another process to process tuples. ShellBolt
- * communicates with that process over stdio using a special protocol. An ~100
- * line library is required to implement that protocol, and adapter libraries
- * currently exist for Ruby and Python.
- *
- * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
- * in the resources directory within the jar submitted to the master.
- * During development/testing on a local machine, that resources directory just
- * needs to be on the classpath.</p>
- *
- * <p>When creating topologies using the Java API, subclass this bolt and implement
- * the IRichBolt interface to create components for the topology that use other languages. For example:
- * </p>
- *
- * <pre>
- * public class MyBolt extends ShellBolt implements IRichBolt {
- *      public MyBolt() {
- *          super("python", "mybolt.py");
- *      }
- *
- *      public void declareOutputFields(OutputFieldsDeclarer declarer) {
- *          declarer.declare(new Fields("field1", "field2"));
- *      }
- * }
- * </pre>
- */
-public class ShellBolt implements IBolt {
-    public static final String HEARTBEAT_STREAM_ID = "__heartbeat";
-    public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
-    Process _subprocess;
-    OutputCollector _collector;
-    Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
-
-    private String[] _command;
-    private ShellProcess _process;
-    private volatile boolean _running = true;
-    private volatile Throwable _exception;
-    private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
-    private Random _rand;
-
-    private Thread _readerThread;
-    private Thread _writerThread;
-    
-    private TopologyContext _context;
-
-    private int workerTimeoutMills;
-    private ScheduledExecutorService heartBeatExecutorService;
-    private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
-    private AtomicBoolean sendHeartbeatFlag = new AtomicBoolean(false);
-
-    public ShellBolt(ShellComponent component) {
-        this(component.get_execution_command(), component.get_script());
-    }
-
-    public ShellBolt(String... command) {
-        _command = command;
-    }
-
-    public void prepare(Map stormConf, TopologyContext context,
-                        final OutputCollector collector) {
-        Object maxPending = stormConf.get(Config.TOPOLOGY_SHELLBOLT_MAX_PENDING);
-        if (maxPending != null) {
-           this._pendingWrites = new LinkedBlockingQueue(((Number)maxPending).intValue());
-        }
-        _rand = new Random();
-        _collector = collector;
-
-        _context = context;
-
-        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
-
-        _process = new ShellProcess(_command);
-
-        //subprocesses must send their pid first thing
-        Number subpid = _process.launch(stormConf, context);
-        LOG.info("Launched subprocess with pid " + subpid);
-
-        // reader
-        _readerThread = new Thread(new BoltReaderRunnable());
-        _readerThread.start();
-
-        _writerThread = new Thread(new BoltWriterRunnable());
-        _writerThread.start();
-
-        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
-        heartBeatExecutorService.scheduleAtFixedRate(new BoltHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
-
-        LOG.info("Start checking heartbeat...");
-        setHeartbeat();
-    }
-
-    public void execute(Tuple input) {
-        if (_exception != null) {
-            throw new RuntimeException(_exception);
-        }
-
-        //just need an id
-        String genId = Long.toString(_rand.nextLong());
-        _inputs.put(genId, input);
-        try {
-            BoltMsg boltMsg = createBoltMessage(input, genId);
-
-            _pendingWrites.put(boltMsg);
-        } catch(InterruptedException e) {
-            String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
-            throw new RuntimeException("Error during multilang processing " + processInfo, e);
-        }
-    }
-
-    private BoltMsg createBoltMessage(Tuple input, String genId) {
-        BoltMsg boltMsg = new BoltMsg();
-        boltMsg.setId(genId);
-        boltMsg.setComp(input.getSourceComponent());
-        boltMsg.setStream(input.getSourceStreamId());
-        boltMsg.setTask(input.getSourceTask());
-        boltMsg.setTuple(input.getValues());
-        return boltMsg;
-    }
-
-    public void cleanup() {
-        _running = false;
-        heartBeatExecutorService.shutdownNow();
-        _writerThread.interrupt();
-        _readerThread.interrupt();
-        _process.destroy();
-        _inputs.clear();
-    }
-
-    private void handleAck(Object id) {
-        Tuple acked = _inputs.remove(id);
-        if(acked==null) {
-            throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
-        }
-        _collector.ack(acked);
-    }
-
-    private void handleFail(Object id) {
-        Tuple failed = _inputs.remove(id);
-        if(failed==null) {
-            throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
-        }
-        _collector.fail(failed);
-    }
-
-    private void handleError(String msg) {
-        _collector.reportError(new Exception("Shell Process Exception: " + msg));
-    }
-
-    private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
-        List<Tuple> anchors = new ArrayList<Tuple>();
-        List<String> recvAnchors = shellMsg.getAnchors();
-        if (recvAnchors != null) {
-            for (String anchor : recvAnchors) {
-                Tuple t = _inputs.get(anchor);
-                if (t == null) {
-                    throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
-                }
-                anchors.add(t);
-            }
-        }
-
-        if(shellMsg.getTask() == 0) {
-            List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
-            if (shellMsg.areTaskIdsNeeded()) {
-                _pendingWrites.put(outtasks);
-            }
-        } else {
-            _collector.emitDirect((int) shellMsg.getTask(),
-                    shellMsg.getStream(), anchors, shellMsg.getTuple());
-        }
-    }
-
-    private void handleLog(ShellMsg shellMsg) {
-        String msg = shellMsg.getMsg();
-        msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
-        ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
-
-        switch (logLevel) {
-            case TRACE:
-                LOG.trace(msg);
-                break;
-            case DEBUG:
-                LOG.debug(msg);
-                break;
-            case INFO:
-                LOG.info(msg);
-                break;
-            case WARN:
-                LOG.warn(msg);
-                break;
-            case ERROR:
-                LOG.error(msg);
-                _collector.reportError(new ReportedFailedException(msg));
-                break;
-            default:
-                LOG.info(msg);
-                break;
-        }
-    }
-
-    private void handleMetrics(ShellMsg shellMsg) {
-        //get metric name
-        String name = shellMsg.getMetricName();
-        if (name.isEmpty()) {
-            throw new RuntimeException("Receive Metrics name is empty");
-        }
-        
-        //get metric by name
-        IMetric iMetric = _context.getRegisteredMetricByName(name);
-        if (iMetric == null) {
-            throw new RuntimeException("Could not find metric by name["+name+"] ");
-        }
-        if ( !(iMetric instanceof IShellMetric)) {
-            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
-        }
-        IShellMetric iShellMetric = (IShellMetric)iMetric;
-        
-        //call updateMetricFromRPC with params
-        Object paramsObj = shellMsg.getMetricParams();
-        try {
-            iShellMetric.updateMetricFromRPC(paramsObj);
-        } catch (RuntimeException re) {
-            throw re;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }       
-    }
-
-    private void setHeartbeat() {
-        lastHeartbeatTimestamp.set(System.currentTimeMillis());
-    }
-
-    private long getLastHeartbeat() {
-        return lastHeartbeatTimestamp.get();
-    }
-
-    private void die(Throwable exception) {
-        String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
-        _exception = new RuntimeException(processInfo, exception);
-        LOG.error("Halting process: ShellBolt died.", exception);
-        _collector.reportError(exception);
-        if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
-            System.exit(11);
-        }
-    }
-
-    private class BoltHeartbeatTimerTask extends TimerTask {
-        private ShellBolt bolt;
-
-        public BoltHeartbeatTimerTask(ShellBolt bolt) {
-            this.bolt = bolt;
-        }
-
-        @Override
-        public void run() {
-            long currentTimeMillis = System.currentTimeMillis();
-            long lastHeartbeat = getLastHeartbeat();
-
-            LOG.debug("BOLT - current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
-                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
-
-            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
-                bolt.die(new RuntimeException("subprocess heartbeat timeout"));
-            }
-
-            sendHeartbeatFlag.compareAndSet(false, true);
-        }
-
-
-    }
-
-    private class BoltReaderRunnable implements Runnable {
-        public void run() {
-            while (_running) {
-                try {
-                    ShellMsg shellMsg = _process.readShellMsg();
-
-                    String command = shellMsg.getCommand();
-                    if (command == null) {
-                        throw new IllegalArgumentException("Command not found in bolt message: " + shellMsg);
-                    }
-                    if (command.equals("sync")) {
-                        setHeartbeat();
-                    } else if(command.equals("ack")) {
-                        handleAck(shellMsg.getId());
-                    } else if (command.equals("fail")) {
-                        handleFail(shellMsg.getId());
-                    } else if (command.equals("error")) {
-                        handleError(shellMsg.getMsg());
-                    } else if (command.equals("log")) {
-                        handleLog(shellMsg);
-                    } else if (command.equals("emit")) {
-                        handleEmit(shellMsg);
-                    } else if (command.equals("metrics")) {
-                        handleMetrics(shellMsg);
-                    }
-                } catch (InterruptedException e) {
-                } catch (Throwable t) {
-                    die(t);
-                }
-            }
-        }
-    }
-
-    private class BoltWriterRunnable implements Runnable {
-        public void run() {
-            while (_running) {
-                try {
-                    if (sendHeartbeatFlag.get()) {
-                        LOG.debug("BOLT - sending heartbeat request to subprocess");
-
-                        String genId = Long.toString(_rand.nextLong());
-                        _process.writeBoltMsg(createHeartbeatBoltMessage(genId));
-                        sendHeartbeatFlag.compareAndSet(true, false);
-                    }
-
-                    Object write = _pendingWrites.poll(1, SECONDS);
-                    if (write instanceof BoltMsg) {
-                        _process.writeBoltMsg((BoltMsg) write);
-                    } else if (write instanceof List<?>) {
-                        _process.writeTaskIds((List<Integer>)write);
-                    } else if (write != null) {
-                        throw new RuntimeException("Unknown class type to write: " + write.getClass().getName());
-                    }
-                } catch (InterruptedException e) {
-                } catch (Throwable t) {
-                    die(t);
-                }
-            }
-        }
-
-        private BoltMsg createHeartbeatBoltMessage(String genId) {
-            BoltMsg msg = new BoltMsg();
-            msg.setId(genId);
-            msg.setTask(Constants.SYSTEM_TASK_ID);
-            msg.setStream(HEARTBEAT_STREAM_ID);
-            msg.setTuple(new ArrayList<Object>());
-            return msg;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java b/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java
deleted file mode 100644
index 34ef4fa..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/TopologyContext.java
+++ /dev/null
@@ -1,317 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.hooks.ITaskHook;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.ReducedMetric;
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.state.ISubscribedState;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang.NotImplementedException;
-
-/**
- * A TopologyContext is given to bolts and spouts in their "prepare" and "open"
- * methods, respectively. This object provides information about the component's
- * place within the topology, such as task ids, inputs and outputs, etc.
- * 
- * <p>
- * The TopologyContext is also used to declare ISubscribedState objects to
- * synchronize state with StateSpouts this object is subscribed to.
- * </p>
- */
-public class TopologyContext extends WorkerTopologyContext implements
-		IMetricsContext {
-	private Integer _taskId;
-	private Map<String, Object> _taskData = new HashMap<String, Object>();
-	private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
-	private Map<String, Object> _executorData;
-	private Map<Integer, Map<Integer, Map<String, IMetric>>> _registeredMetrics;
-	private clojure.lang.Atom _openOrPrepareWasCalled;
-
-	public TopologyContext(StormTopology topology, Map stormConf,
-			Map<Integer, String> taskToComponent,
-			Map<String, List<Integer>> componentToSortedTasks,
-			Map<String, Map<String, Fields>> componentToStreamToFields,
-			String topologyId, String codeDir, String pidDir, Integer taskId,
-			Integer workerPort, List<Integer> workerTasks,
-			Map<String, Object> defaultResources,
-			Map<String, Object> userResources,
-			Map<String, Object> executorData, Map registeredMetrics,
-			clojure.lang.Atom openOrPrepareWasCalled) {
-		super(topology, stormConf, taskToComponent, componentToSortedTasks,
-				componentToStreamToFields, topologyId, codeDir, pidDir,
-				workerPort, workerTasks, defaultResources, userResources);
-		_taskId = taskId;
-		_executorData = executorData;
-		_registeredMetrics = registeredMetrics;
-		_openOrPrepareWasCalled = openOrPrepareWasCalled;
-	}
-
-	/**
-	 * All state from all subscribed state spouts streams will be synced with
-	 * the provided object.
-	 * 
-	 * <p>
-	 * It is recommended that your ISubscribedState object is kept as an
-	 * instance variable of this object. The recommended usage of this method is
-	 * as follows:
-	 * </p>
-	 * 
-	 * <p>
-	 * _myState = context.setAllSubscribedState(new MyState());
-	 * </p>
-	 * 
-	 * @param obj
-	 *            Provided ISubscribedState implementation
-	 * @return Returns the ISubscribedState object provided
-	 */
-	public <T extends ISubscribedState> T setAllSubscribedState(T obj) {
-		// check that only subscribed to one component/stream for statespout
-		// setsubscribedstate appropriately
-		throw new NotImplementedException();
-	}
-
-	/**
-	 * Synchronizes the default stream from the specified state spout component
-	 * id with the provided ISubscribedState object.
-	 * 
-	 * <p>
-	 * The recommended usage of this method is as follows:
-	 * </p>
-	 * <p>
-	 * _myState = context.setSubscribedState(componentId, new MyState());
-	 * </p>
-	 * 
-	 * @param componentId
-	 *            the id of the StateSpout component to subscribe to
-	 * @param obj
-	 *            Provided ISubscribedState implementation
-	 * @return Returns the ISubscribedState object provided
-	 */
-	public <T extends ISubscribedState> T setSubscribedState(
-			String componentId, T obj) {
-		return setSubscribedState(componentId, Utils.DEFAULT_STREAM_ID, obj);
-	}
-
-	/**
-	 * Synchronizes the specified stream from the specified state spout
-	 * component id with the provided ISubscribedState object.
-	 * 
-	 * <p>
-	 * The recommended usage of this method is as follows:
-	 * </p>
-	 * <p>
-	 * _myState = context.setSubscribedState(componentId, streamId, new
-	 * MyState());
-	 * </p>
-	 * 
-	 * @param componentId
-	 *            the id of the StateSpout component to subscribe to
-	 * @param streamId
-	 *            the stream to subscribe to
-	 * @param obj
-	 *            Provided ISubscribedState implementation
-	 * @return Returns the ISubscribedState object provided
-	 */
-	public <T extends ISubscribedState> T setSubscribedState(
-			String componentId, String streamId, T obj) {
-		throw new NotImplementedException();
-	}
-
-	/**
-	 * Gets the task id of this task.
-	 * 
-	 * @return the task id
-	 */
-	public int getThisTaskId() {
-		return _taskId;
-	}
-
-	/**
-	 * Gets the component id for this task. The component id maps to a component
-	 * id specified for a Spout or Bolt in the topology definition.
-	 * 
-	 * @return
-	 */
-	public String getThisComponentId() {
-		return getComponentId(_taskId);
-	}
-
-	/**
-	 * Gets the declared output fields for the specified stream id for the
-	 * component this task is a part of.
-	 */
-	public Fields getThisOutputFields(String streamId) {
-		return getComponentOutputFields(getThisComponentId(), streamId);
-	}
-
-	/**
-	 * Gets the set of streams declared for the component of this task.
-	 */
-	public Set<String> getThisStreams() {
-		return getComponentStreams(getThisComponentId());
-	}
-
-	/**
-	 * Gets the index of this task id in
-	 * getComponentTasks(getThisComponentId()). An example use case for this
-	 * method is determining which task accesses which resource in a distributed
-	 * resource to ensure an even distribution.
-	 */
-	public int getThisTaskIndex() {
-		List<Integer> tasks = new ArrayList<Integer>(
-				getComponentTasks(getThisComponentId()));
-		Collections.sort(tasks);
-		for (int i = 0; i < tasks.size(); i++) {
-			if (tasks.get(i) == getThisTaskId()) {
-				return i;
-			}
-		}
-		throw new RuntimeException(
-				"Fatal: could not find this task id in this component");
-	}
-
-	/**
-	 * Gets the declared inputs to this component.
-	 * 
-	 * @return A map from subscribed component/stream to the grouping subscribed
-	 *         with.
-	 */
-	public Map<GlobalStreamId, Grouping> getThisSources() {
-		return getSources(getThisComponentId());
-	}
-
-	/**
-	 * Gets information about who is consuming the outputs of this component,
-	 * and how.
-	 * 
-	 * @return Map from stream id to component id to the Grouping used.
-	 */
-	public Map<String, Map<String, Grouping>> getThisTargets() {
-		return getTargets(getThisComponentId());
-	}
-
-	public void setTaskData(String name, Object data) {
-		_taskData.put(name, data);
-	}
-
-	public Object getTaskData(String name) {
-		return _taskData.get(name);
-	}
-
-	public void setExecutorData(String name, Object data) {
-		_executorData.put(name, data);
-	}
-
-	public Object getExecutorData(String name) {
-		return _executorData.get(name);
-	}
-
-	public void addTaskHook(ITaskHook hook) {
-		hook.prepare(_stormConf, this);
-		_hooks.add(hook);
-	}
-
-	public Collection<ITaskHook> getHooks() {
-		return _hooks;
-	}
-
-	/*
-	 * Register a IMetric instance. Storm will then call getValueAndReset on the
-	 * metric every timeBucketSizeInSecs and the returned value is sent to all
-	 * metrics consumers. You must call this during IBolt::prepare or
-	 * ISpout::open.
-	 * 
-	 * @return The IMetric argument unchanged.
-	 */
-	public <T extends IMetric> T registerMetric(String name, T metric,
-			int timeBucketSizeInSecs) {
-		if ((Boolean) _openOrPrepareWasCalled.deref() == true) {
-			throw new RuntimeException(
-					"TopologyContext.registerMetric can only be called from within overridden "
-							+ "IBolt::prepare() or ISpout::open() method.");
-        }
-
-        if (metric == null) {
-            throw new IllegalArgumentException("Cannot register a null metric");
-        }
-
-        if (timeBucketSizeInSecs <= 0) {
-            throw new IllegalArgumentException("TopologyContext.registerMetric can only be called with timeBucketSizeInSecs " +
-                                               "greater than or equal to 1 second.");
-        }
-        
-        if (getRegisteredMetricByName(name) != null) {
-            throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
-        }
-
-		Map m1 = _registeredMetrics;
-		if (!m1.containsKey(timeBucketSizeInSecs)) {
-			m1.put(timeBucketSizeInSecs, new HashMap());
-		}
-
-		Map m2 = (Map) m1.get(timeBucketSizeInSecs);
-		if (!m2.containsKey(_taskId)) {
-			m2.put(_taskId, new HashMap());
-		}
-
-		Map m3 = (Map) m2.get(_taskId);
-		if (m3.containsKey(name)) {
-			throw new RuntimeException("The same metric name `" + name
-					+ "` was registered twice.");
-		} else {
-			m3.put(name, metric);
-		}
-
-		return metric;
-	}
-
-    /**
-     * Get component's metric from registered metrics by name.
-     * Notice: Normally, one component can only register one metric name once.
-     *         But now registerMetric has a bug(https://issues.apache.org/jira/browse/STORM-254) 
-     *         cause the same metric name can register twice.
-     *         So we just return the first metric we meet.
-     */
-    public IMetric getRegisteredMetricByName(String name) {
-        IMetric metric = null;
-
-        for (Map<Integer, Map<String, IMetric>> taskIdToNameToMetric: _registeredMetrics.values()) {
-            Map<String, IMetric> nameToMetric = taskIdToNameToMetric.get(_taskId);
-            if (nameToMetric != null) {
-                metric = nameToMetric.get(name);
-                if (metric != null) {
-                    //we just return the first metric we meet
-                    break;  
-                }
-            }
-        } 
-        
-        return metric;
-    }   
- 
-    /*
-     * Convinience method for registering ReducedMetric.
-     */
-    public ReducedMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
-        return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
-    }
-    /*
-     * Convinience method for registering CombinedMetric.
-     */
-    public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
-        return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java b/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java
deleted file mode 100644
index 42e88dc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/WorkerTopologyContext.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-public class WorkerTopologyContext extends GeneralTopologyContext {
-	public static final String SHARED_EXECUTOR = "executor";
-
-	private Integer _workerPort;
-	private List<Integer> _workerTasks;
-	private String _codeDir;
-	private String _pidDir;
-	Map<String, Object> _userResources;
-	Map<String, Object> _defaultResources;
-
-	public WorkerTopologyContext(StormTopology topology, Map stormConf,
-			Map<Integer, String> taskToComponent,
-			Map<String, List<Integer>> componentToSortedTasks,
-			Map<String, Map<String, Fields>> componentToStreamToFields,
-			String topologyId, String codeDir, String pidDir, Integer workerPort,
-			List<Integer> workerTasks, Map<String, Object> defaultResources,
-			Map<String, Object> userResources) {
-		super(topology, stormConf, taskToComponent, componentToSortedTasks,
-				componentToStreamToFields, topologyId);
-		_codeDir = codeDir;
-		_defaultResources = defaultResources;
-		_userResources = userResources;
-		try {
-			if (pidDir != null) {
-				_pidDir = new File(pidDir).getCanonicalPath();
-			} else {
-				_pidDir = null;
-			}
-		} catch (IOException e) {
-			throw new RuntimeException("Could not get canonical path for "
-					+ _pidDir, e);
-		}
-		_workerPort = workerPort;
-		_workerTasks = workerTasks;
-	}
-
-	/**
-	 * Gets all the task ids that are running in this worker process (including
-	 * the task for this task).
-	 */
-	public List<Integer> getThisWorkerTasks() {
-		return _workerTasks;
-	}
-
-	public Integer getThisWorkerPort() {
-		return _workerPort;
-	}
-
-	/**
-	 * Gets the location of the external resources for this worker on the local
-	 * filesystem. These external resources typically include bolts implemented
-	 * in other languages, such as Ruby or Python.
-	 */
-	public String getCodeDir() {
-		return _codeDir;
-	}
-
-	/**
-	 * If this task spawns any subprocesses, those subprocesses must immediately
-	 * write their PID to this directory on the local filesystem to ensure that
-	 * Storm properly destroys that process when the worker is shutdown.
-	 */
-	public String getPIDDir() {
-		return _pidDir;
-	}
-
-	public Object getResource(String name) {
-		return _userResources.get(name);
-	}
-
-	public ExecutorService getSharedExecutor() {
-		return (ExecutorService) _defaultResources.get(SHARED_EXECUTOR);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java b/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java
deleted file mode 100644
index 131dee7..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/AckFailDelegate.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.testing;
-
-import java.io.Serializable;
-
-public interface AckFailDelegate extends Serializable {
-	public void ack(Object id);
-
-	public void fail(Object id);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java
deleted file mode 100644
index 68d334d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/AckFailMapTracker.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.utils.RegisteredGlobalState;
-import java.util.HashSet;
-import java.util.Set;
-
-public class AckFailMapTracker implements AckFailDelegate {
-
-	String _acked;
-	String _failed;
-
-	public AckFailMapTracker() {
-		_acked = RegisteredGlobalState.registerState(new HashSet());
-		_failed = RegisteredGlobalState.registerState(new HashSet());
-	}
-
-	public boolean isAcked(Object id) {
-		return ((Set) RegisteredGlobalState.getState(_acked)).contains(id);
-	}
-
-	public boolean isFailed(Object id) {
-		return ((Set) RegisteredGlobalState.getState(_failed)).contains(id);
-	}
-
-	@Override
-	public void ack(Object id) {
-		((Set) RegisteredGlobalState.getState(_acked)).add(id);
-	}
-
-	@Override
-	public void fail(Object id) {
-		((Set) RegisteredGlobalState.getState(_failed)).add(id);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java
deleted file mode 100644
index 134f8f0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/AckTracker.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class AckTracker implements AckFailDelegate {
-	private static Map<String, AtomicInteger> acks = new ConcurrentHashMap<String, AtomicInteger>();
-
-	private String _id;
-
-	public AckTracker() {
-		_id = UUID.randomUUID().toString();
-		acks.put(_id, new AtomicInteger(0));
-	}
-
-	@Override
-	public void ack(Object id) {
-		acks.get(_id).incrementAndGet();
-	}
-
-	@Override
-	public void fail(Object id) {
-	}
-
-	public int getNumAcks() {
-		return acks.get(_id).intValue();
-	}
-
-	public void resetNumAcks() {
-		acks.get(_id).set(0);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java b/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java
deleted file mode 100644
index dd6530e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BatchNumberList.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class BatchNumberList extends BaseBatchBolt {
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("word", "list"));
-	}
-
-	String _wordComponent;
-
-	public BatchNumberList(String wordComponent) {
-		_wordComponent = wordComponent;
-	}
-
-	String word = null;
-	List<Integer> intSet = new ArrayList<Integer>();
-	BatchOutputCollector _collector;
-
-	@Override
-	public void prepare(Map conf, TopologyContext context,
-			BatchOutputCollector collector, Object id) {
-		_collector = collector;
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		if (tuple.getSourceComponent().equals(_wordComponent)) {
-			this.word = tuple.getString(1);
-		} else {
-			intSet.add(tuple.getInteger(1));
-		}
-	}
-
-	@Override
-	public void finishBatch() {
-		if (word != null) {
-			Collections.sort(intSet);
-			_collector.emit(new Values(word, intSet));
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java b/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java
deleted file mode 100644
index 21f316f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BatchProcessWord.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class BatchProcessWord extends BaseBasicBolt {
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("id", "size"));
-	}
-
-	@Override
-	public void execute(Tuple input, BasicOutputCollector collector) {
-		collector.emit(new Values(input.getValue(0), input.getString(1)
-				.length()));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java b/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java
deleted file mode 100644
index 9cb06c3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BatchRepeatA.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-
-public class BatchRepeatA extends BaseBasicBolt {
-
-	@Override
-	public void execute(Tuple input, BasicOutputCollector collector) {
-		Object id = input.getValue(0);
-		String word = input.getString(1);
-		for (int i = 0; i < word.length(); i++) {
-			if (word.charAt(i) == 'a') {
-				collector.emit("multi", new Values(id, word.substring(0, i)));
-			}
-		}
-		collector.emit("single", new Values(id, word));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declareStream("multi", new Fields("id", "word"));
-		declarer.declareStream("single", new Fields("id", "word"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java
deleted file mode 100644
index fb928c8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/BoltTracker.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.HashMap;
-import java.util.Map;
-
-public class BoltTracker extends NonRichBoltTracker implements IRichBolt {
-	IRichBolt _richDelegate;
-
-	public BoltTracker(IRichBolt delegate, String id) {
-		super(delegate, id);
-		_richDelegate = delegate;
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_richDelegate.declareOutputFields(declarer);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return new HashMap<String, Object>();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java b/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
deleted file mode 100644
index 4017ddb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/CompleteTopologyParam.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-
-/**
- * The param class for the <code>Testing.completeTopology</code>.
- */
-public class CompleteTopologyParam {
-	/**
-	 * The mocked spout sources
-	 */
-	private MockedSources mockedSources;
-	/**
-	 * the config for the topology when it was submitted to the cluster
-	 */
-	private Config stormConf;
-	/**
-	 * whether cleanup the state?
-	 */
-	private Boolean cleanupState;
-	/**
-	 * the topology name you want to submit to the cluster
-	 */
-	private String topologyName;
-
-	public MockedSources getMockedSources() {
-		return mockedSources;
-	}
-
-	public void setMockedSources(MockedSources mockedSources) {
-		this.mockedSources = mockedSources;
-	}
-
-	public Config getStormConf() {
-		return stormConf;
-	}
-
-	public void setStormConf(Config stormConf) {
-		this.stormConf = stormConf;
-	}
-
-	public Boolean getCleanupState() {
-		return cleanupState;
-	}
-
-	public void setCleanupState(Boolean cleanupState) {
-		this.cleanupState = cleanupState;
-	}
-
-	public String getTopologyName() {
-		return topologyName;
-	}
-
-	public void setTopologyName(String topologyName) {
-		this.topologyName = topologyName;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java
deleted file mode 100644
index 75dca1a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/CountingBatchBolt.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.Map;
-
-public class CountingBatchBolt extends BaseBatchBolt {
-	BatchOutputCollector _collector;
-	Object _id;
-	int _count = 0;
-
-	@Override
-	public void prepare(Map conf, TopologyContext context,
-			BatchOutputCollector collector, Object id) {
-		_collector = collector;
-		_id = id;
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		_count++;
-	}
-
-	@Override
-	public void finishBatch() {
-		_collector.emit(new Values(_id, _count));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("tx", "count"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java
deleted file mode 100644
index e3533a6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/CountingCommitBolt.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.topology.base.BaseTransactionalBolt;
-import backtype.storm.transactional.ICommitter;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.Map;
-
-public class CountingCommitBolt extends BaseTransactionalBolt implements
-		ICommitter {
-	BatchOutputCollector _collector;
-	TransactionAttempt _id;
-	int _count = 0;
-
-	@Override
-	public void prepare(Map conf, TopologyContext context,
-			BatchOutputCollector collector, TransactionAttempt id) {
-		_id = id;
-		_collector = collector;
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		_count++;
-	}
-
-	@Override
-	public void finishBatch() {
-		_collector.emit(new Values(_id, _count));
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("tx", "count"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java
deleted file mode 100644
index 871e573..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/FeederSpout.java
+++ /dev/null
@@ -1,83 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.Map;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.InprocMessaging;
-import java.util.HashMap;
-import java.util.List;
-import java.util.UUID;
-
-public class FeederSpout extends BaseRichSpout {
-	private int _id;
-	private Fields _outFields;
-	private SpoutOutputCollector _collector;
-	private AckFailDelegate _ackFailDelegate;
-
-	public FeederSpout(Fields outFields) {
-		_id = InprocMessaging.acquireNewPort();
-		_outFields = outFields;
-	}
-
-	public void setAckFailDelegate(AckFailDelegate d) {
-		_ackFailDelegate = d;
-	}
-
-	public void feed(List<Object> tuple) {
-		feed(tuple, UUID.randomUUID().toString());
-	}
-
-	public void feed(List<Object> tuple, Object msgId) {
-		InprocMessaging.sendMessage(_id, new Values(tuple, msgId));
-	}
-
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		_collector = collector;
-	}
-
-	public void close() {
-
-	}
-
-	public void nextTuple() {
-		List<Object> toEmit = (List<Object>) InprocMessaging.pollMessage(_id);
-		if (toEmit != null) {
-			List<Object> tuple = (List<Object>) toEmit.get(0);
-			Object msgId = toEmit.get(1);
-
-			_collector.emit(tuple, msgId);
-		} else {
-			try {
-				Thread.sleep(1);
-			} catch (InterruptedException e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-
-	public void ack(Object msgId) {
-		if (_ackFailDelegate != null) {
-			_ackFailDelegate.ack(msgId);
-		}
-	}
-
-	public void fail(Object msgId) {
-		if (_ackFailDelegate != null) {
-			_ackFailDelegate.fail(msgId);
-		}
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(_outFields);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return new HashMap<String, Object>();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java b/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java
deleted file mode 100644
index e4cc089..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/FixedTuple.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.utils.Utils;
-import java.io.Serializable;
-import java.util.List;
-
-public class FixedTuple implements Serializable {
-	public String stream;
-	public List<Object> values;
-
-	public FixedTuple(List<Object> values) {
-		this.stream = Utils.DEFAULT_STREAM_ID;
-		this.values = values;
-	}
-
-	public FixedTuple(String stream, List<Object> values) {
-		this.stream = stream;
-		this.values = values;
-	}
-
-	@Override
-	public String toString() {
-		return stream + ":" + "<" + values.toString() + ">";
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java
deleted file mode 100644
index e463df0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/FixedTupleSpout.java
+++ /dev/null
@@ -1,164 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import static backtype.storm.utils.Utils.get;
-
-public class FixedTupleSpout implements IRichSpout {
-	private static final Map<String, Integer> acked = new HashMap<String, Integer>();
-	private static final Map<String, Integer> failed = new HashMap<String, Integer>();
-
-	public static int getNumAcked(String stormId) {
-		synchronized (acked) {
-			return get(acked, stormId, 0);
-		}
-	}
-
-	public static int getNumFailed(String stormId) {
-		synchronized (failed) {
-			return get(failed, stormId, 0);
-		}
-	}
-
-	public static void clear(String stormId) {
-		acked.remove(stormId);
-		failed.remove(stormId);
-	}
-
-	private List<FixedTuple> _tuples;
-	private SpoutOutputCollector _collector;
-
-	private TopologyContext _context;
-	private List<FixedTuple> _serveTuples;
-	private Map<String, FixedTuple> _pending;
-
-	private String _id;
-	private String _fieldName;
-
-	public FixedTupleSpout(List tuples) {
-		this(tuples, null);
-	}
-
-	public FixedTupleSpout(List tuples, String fieldName) {
-		_id = UUID.randomUUID().toString();
-		synchronized (acked) {
-			acked.put(_id, 0);
-		}
-		synchronized (failed) {
-			failed.put(_id, 0);
-		}
-		_tuples = new ArrayList<FixedTuple>();
-		for (Object o : tuples) {
-			FixedTuple ft;
-			if (o instanceof FixedTuple) {
-				ft = (FixedTuple) o;
-			} else {
-				ft = new FixedTuple((List) o);
-			}
-			_tuples.add(ft);
-		}
-		_fieldName = fieldName;
-	}
-
-	public List<FixedTuple> getSourceTuples() {
-		return _tuples;
-	}
-
-	public int getCompleted() {
-		int ackedAmt;
-		int failedAmt;
-
-		synchronized (acked) {
-			ackedAmt = acked.get(_id);
-		}
-		synchronized (failed) {
-			failedAmt = failed.get(_id);
-		}
-		return ackedAmt + failedAmt;
-	}
-
-	public void cleanup() {
-		synchronized (acked) {
-			acked.remove(_id);
-		}
-		synchronized (failed) {
-			failed.remove(_id);
-		}
-	}
-
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		_context = context;
-		List<Integer> tasks = context.getComponentTasks(context
-				.getThisComponentId());
-		int startIndex;
-		for (startIndex = 0; startIndex < tasks.size(); startIndex++) {
-			if (tasks.get(startIndex) == context.getThisTaskId()) {
-				break;
-			}
-		}
-		_collector = collector;
-		_pending = new HashMap<String, FixedTuple>();
-		_serveTuples = new ArrayList<FixedTuple>();
-		for (int i = startIndex; i < _tuples.size(); i += tasks.size()) {
-			_serveTuples.add(_tuples.get(i));
-		}
-	}
-
-	public void close() {
-	}
-
-	public void nextTuple() {
-		if (_serveTuples.size() > 0) {
-			FixedTuple ft = _serveTuples.remove(0);
-			String id = UUID.randomUUID().toString();
-			_pending.put(id, ft);
-			_collector.emit(ft.stream, ft.values, id);
-		} else {
-			Utils.sleep(100);
-		}
-	}
-
-	public void ack(Object msgId) {
-		synchronized (acked) {
-			int curr = get(acked, _id, 0);
-			acked.put(_id, curr + 1);
-		}
-	}
-
-	public void fail(Object msgId) {
-		synchronized (failed) {
-			int curr = get(failed, _id, 0);
-			failed.put(_id, curr + 1);
-		}
-	}
-
-	@Override
-	public void activate() {
-	}
-
-	@Override
-	public void deactivate() {
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		if (_fieldName != null) {
-			declarer.declare(new Fields(_fieldName));
-		}
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java
deleted file mode 100644
index b3f8d87..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/IdentityBolt.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-
-public class IdentityBolt extends BaseBasicBolt {
-	Fields _fields;
-
-	public IdentityBolt(Fields fields) {
-		_fields = fields;
-	}
-
-	@Override
-	public void execute(Tuple input, BasicOutputCollector collector) {
-		collector.emit(input.getValues());
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(_fields);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
deleted file mode 100644
index 58ae380..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingBatchBolt.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KeyedCountingBatchBolt extends BaseBatchBolt {
-	BatchOutputCollector _collector;
-	Object _id;
-	Map<Object, Integer> _counts = new HashMap<Object, Integer>();
-
-	@Override
-	public void prepare(Map conf, TopologyContext context,
-			BatchOutputCollector collector, Object id) {
-		_collector = collector;
-		_id = id;
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		Object key = tuple.getValue(1);
-		int curr = Utils.get(_counts, key, 0);
-		_counts.put(key, curr + 1);
-	}
-
-	@Override
-	public void finishBatch() {
-		for (Object key : _counts.keySet()) {
-			_collector.emit(new Values(_id, key, _counts.get(key)));
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("tx", "key", "count"));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java
deleted file mode 100644
index a170130..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/KeyedCountingCommitterBolt.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.transactional.ICommitter;
-
-public class KeyedCountingCommitterBolt extends KeyedCountingBatchBolt
-		implements ICommitter {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
deleted file mode 100644
index c12a319..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/KeyedSummingBatchBolt.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBatchBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import clojure.lang.Numbers;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KeyedSummingBatchBolt extends BaseBatchBolt {
-	BatchOutputCollector _collector;
-	Object _id;
-	Map<Object, Number> _sums = new HashMap<Object, Number>();
-
-	@Override
-	public void prepare(Map conf, TopologyContext context,
-			BatchOutputCollector collector, Object id) {
-		_collector = collector;
-		_id = id;
-	}
-
-	@Override
-	public void execute(Tuple tuple) {
-		Object key = tuple.getValue(1);
-		Number curr = Utils.get(_sums, key, 0);
-		_sums.put(key, Numbers.add(curr, tuple.getValue(2)));
-	}
-
-	@Override
-	public void finishBatch() {
-		for (Object key : _sums.keySet()) {
-			_collector.emit(new Values(_id, key, _sums.get(key)));
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("tx", "key", "sum"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
deleted file mode 100644
index bf6286f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpout.java
+++ /dev/null
@@ -1,187 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout.Emitter;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.RegisteredGlobalState;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class MemoryTransactionalSpout implements
-		IPartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
-	public static String TX_FIELD = MemoryTransactionalSpout.class.getName()
-			+ "/id";
-
-	private String _id;
-	private String _finishedPartitionsId;
-	private int _takeAmt;
-	private Fields _outFields;
-	private Map<Integer, List<List<Object>>> _initialPartitions;
-
-	public MemoryTransactionalSpout(
-			Map<Integer, List<List<Object>>> partitions, Fields outFields,
-			int takeAmt) {
-		_id = RegisteredGlobalState.registerState(partitions);
-		Map<Integer, Boolean> finished = Collections
-				.synchronizedMap(new HashMap<Integer, Boolean>());
-		_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
-		_takeAmt = takeAmt;
-		_outFields = outFields;
-		_initialPartitions = partitions;
-	}
-
-	public boolean isExhaustedTuples() {
-		Map<Integer, Boolean> statuses = getFinishedStatuses();
-		for (Integer partition : getQueues().keySet()) {
-			if (!statuses.containsKey(partition)
-					|| !getFinishedStatuses().get(partition)) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	class Coordinator implements IPartitionedTransactionalSpout.Coordinator {
-
-		@Override
-		public int numPartitions() {
-			return getQueues().size();
-		}
-
-		@Override
-		public boolean isReady() {
-			return true;
-		}
-
-		@Override
-		public void close() {
-		}
-	}
-
-	class Emitter
-			implements
-			IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
-		Integer _maxSpoutPending;
-		Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-
-		public Emitter(Map conf) {
-			Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-			if (c == null)
-				_maxSpoutPending = 1;
-			else
-				_maxSpoutPending = Utils.getInt(c);
-		}
-
-		@Override
-		public MemoryTransactionalSpoutMeta emitPartitionBatchNew(
-				TransactionAttempt tx, BatchOutputCollector collector,
-				int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
-			int index;
-			if (lastPartitionMeta == null) {
-				index = 0;
-			} else {
-				index = lastPartitionMeta.index + lastPartitionMeta.amt;
-			}
-			List<List<Object>> queue = getQueues().get(partition);
-			int total = queue.size();
-			int left = total - index;
-			int toTake = Math.min(left, _takeAmt);
-
-			MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(
-					index, toTake);
-			emitPartitionBatch(tx, collector, partition, ret);
-			if (toTake == 0) {
-				// this is a pretty hacky way to determine when all the
-				// partitions have been committed
-				// wait until we've emitted max-spout-pending empty partitions
-				// for the partition
-				int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
-				_emptyPartitions.put(partition, curr);
-				if (curr > _maxSpoutPending) {
-					Map<Integer, Boolean> finishedStatuses = getFinishedStatuses();
-					// will be null in remote mode
-					if (finishedStatuses != null) {
-						finishedStatuses.put(partition, true);
-					}
-				}
-			}
-			return ret;
-		}
-
-		@Override
-		public void emitPartitionBatch(TransactionAttempt tx,
-				BatchOutputCollector collector, int partition,
-				MemoryTransactionalSpoutMeta partitionMeta) {
-			List<List<Object>> queue = getQueues().get(partition);
-			for (int i = partitionMeta.index; i < partitionMeta.index
-					+ partitionMeta.amt; i++) {
-				List<Object> toEmit = new ArrayList<Object>(queue.get(i));
-				toEmit.add(0, tx);
-				collector.emit(toEmit);
-			}
-		}
-
-		@Override
-		public void close() {
-		}
-	}
-
-	@Override
-	public IPartitionedTransactionalSpout.Coordinator getCoordinator(Map conf,
-			TopologyContext context) {
-		return new Coordinator();
-	}
-
-	@Override
-	public IPartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> getEmitter(
-			Map conf, TopologyContext context) {
-		return new Emitter(conf);
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		List<String> toDeclare = new ArrayList<String>(_outFields.toList());
-		toDeclare.add(0, TX_FIELD);
-		declarer.declare(new Fields(toDeclare));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		Config conf = new Config();
-		conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
-		return conf;
-	}
-
-	public void startup() {
-		getFinishedStatuses().clear();
-	}
-
-	public void cleanup() {
-		RegisteredGlobalState.clearState(_id);
-		RegisteredGlobalState.clearState(_finishedPartitionsId);
-	}
-
-	private Map<Integer, List<List<Object>>> getQueues() {
-		Map<Integer, List<List<Object>>> ret = (Map<Integer, List<List<Object>>>) RegisteredGlobalState
-				.getState(_id);
-		if (ret != null)
-			return ret;
-		else
-			return _initialPartitions;
-	}
-
-	private Map<Integer, Boolean> getFinishedStatuses() {
-		return (Map<Integer, Boolean>) RegisteredGlobalState
-				.getState(_finishedPartitionsId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java b/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
deleted file mode 100644
index 4d87a66..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MemoryTransactionalSpoutMeta.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package backtype.storm.testing;
-
-public class MemoryTransactionalSpoutMeta {
-	int index;
-	int amt;
-
-	// for kryo compatibility
-	public MemoryTransactionalSpoutMeta() {
-
-	}
-
-	public MemoryTransactionalSpoutMeta(int index, int amt) {
-		this.index = index;
-		this.amt = amt;
-	}
-
-	@Override
-	public String toString() {
-		return "index: " + index + "; amt: " + amt;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java b/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java
deleted file mode 100644
index 8ec7102..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MkClusterParam.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.Map;
-
-/**
- * The param arg for <code>Testing.withSimulatedTimeCluster</code> and
- * <code>Testing.withTrackedCluster</code>
- */
-public class MkClusterParam {
-	/**
-	 * count of supervisors for the cluster.
-	 */
-	private Integer supervisors;
-	/**
-	 * count of port for each supervisor
-	 */
-	private Integer portsPerSupervisor;
-	/**
-	 * cluster config
-	 */
-	private Map daemonConf;
-
-	public Integer getSupervisors() {
-		return supervisors;
-	}
-
-	public void setSupervisors(Integer supervisors) {
-		this.supervisors = supervisors;
-	}
-
-	public Integer getPortsPerSupervisor() {
-		return portsPerSupervisor;
-	}
-
-	public void setPortsPerSupervisor(Integer portsPerSupervisor) {
-		this.portsPerSupervisor = portsPerSupervisor;
-	}
-
-	public Map getDaemonConf() {
-		return daemonConf;
-	}
-
-	public void setDaemonConf(Map daemonConf) {
-		this.daemonConf = daemonConf;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java b/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java
deleted file mode 100644
index a98704d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MkTupleParam.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class MkTupleParam {
-	private String stream;
-	private String component;
-	private List<String> fields;
-
-	public String getStream() {
-		return stream;
-	}
-
-	public void setStream(String stream) {
-		this.stream = stream;
-	}
-
-	public String getComponent() {
-		return component;
-	}
-
-	public void setComponent(String component) {
-		this.component = component;
-	}
-
-	public List<String> getFields() {
-		return fields;
-	}
-
-	public void setFields(String... fields) {
-		this.fields = new ArrayList<String>();
-		for (int i = 0; i < fields.length; i++) {
-			this.fields.add(fields[i]);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java b/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java
deleted file mode 100644
index 6c61edb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/MockedSources.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-
-public class MockedSources {
-	/**
-	 * mocked spout sources for the [spout, stream] pair.
-	 */
-	private Map<String, List<FixedTuple>> data = new HashMap<String, List<FixedTuple>>();
-
-	/**
-	 * add mock data for the spout.
-	 * 
-	 * @param spoutId
-	 *            the spout to be mocked
-	 * @param streamId
-	 *            the stream of the spout to be mocked
-	 * @param objects
-	 *            the mocked data
-	 */
-	public void addMockData(String spoutId, String streamId, Values... valueses) {
-		if (!data.containsKey(spoutId)) {
-			data.put(spoutId, new ArrayList<FixedTuple>());
-		}
-
-		List<FixedTuple> tuples = data.get(spoutId);
-		for (int i = 0; i < valueses.length; i++) {
-			FixedTuple tuple = new FixedTuple(streamId, valueses[i]);
-			tuples.add(tuple);
-		}
-	}
-
-	public void addMockData(String spoutId, Values... valueses) {
-		this.addMockData(spoutId, Utils.DEFAULT_STREAM_ID, valueses);
-	}
-
-	public Map<String, List<FixedTuple>> getData() {
-		return this.data;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java b/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java
deleted file mode 100644
index 9954b06..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/NGrouping.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.task.WorkerTopologyContext;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class NGrouping implements CustomStreamGrouping {
-	int _n;
-	List<Integer> _outTasks;
-
-	public NGrouping(int n) {
-		_n = n;
-	}
-
-	@Override
-	public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
-			List<Integer> targetTasks) {
-		targetTasks = new ArrayList<Integer>(targetTasks);
-		Collections.sort(targetTasks);
-		_outTasks = new ArrayList<Integer>();
-		for (int i = 0; i < _n; i++) {
-			_outTasks.add(targetTasks.get(i));
-		}
-	}
-
-	@Override
-	public List<Integer> chooseTasks(int taskId, List<Object> values) {
-		return _outTasks;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
deleted file mode 100644
index c5918ad..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/NonRichBoltTracker.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.RegisteredGlobalState;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class NonRichBoltTracker implements IBolt {
-	IBolt _delegate;
-	String _trackId;
-
-	public NonRichBoltTracker(IBolt delegate, String id) {
-		_delegate = delegate;
-		_trackId = id;
-	}
-
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		_delegate.prepare(stormConf, context, collector);
-	}
-
-	public void execute(Tuple input) {
-		_delegate.execute(input);
-		Map stats = (Map) RegisteredGlobalState.getState(_trackId);
-		((AtomicInteger) stats.get("processed")).incrementAndGet();
-	}
-
-	public void cleanup() {
-		_delegate.cleanup();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
deleted file mode 100644
index 371c622..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/OpaqueMemoryTransactionalSpout.java
+++ /dev/null
@@ -1,190 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.RegisteredGlobalState;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This spout only works in local mode.
- */
-public class OpaqueMemoryTransactionalSpout implements
-		IOpaquePartitionedTransactionalSpout<MemoryTransactionalSpoutMeta> {
-	public static String TX_FIELD = MemoryTransactionalSpout.class.getName()
-			+ "/id";
-
-	private String _id;
-	private String _finishedPartitionsId;
-	private String _disabledId;
-	private int _takeAmt;
-	private Fields _outFields;
-
-	public OpaqueMemoryTransactionalSpout(
-			Map<Integer, List<List<Object>>> partitions, Fields outFields,
-			int takeAmt) {
-		_id = RegisteredGlobalState.registerState(partitions);
-
-		Map<Integer, Boolean> finished = Collections
-				.synchronizedMap(new HashMap<Integer, Boolean>());
-		_finishedPartitionsId = RegisteredGlobalState.registerState(finished);
-
-		Map<Integer, Boolean> disabled = Collections
-				.synchronizedMap(new HashMap<Integer, Boolean>());
-		_disabledId = RegisteredGlobalState.registerState(disabled);
-
-		_takeAmt = takeAmt;
-		_outFields = outFields;
-	}
-
-	public void setDisabled(Integer partition, boolean disabled) {
-		getDisabledStatuses().put(partition, disabled);
-	}
-
-	public boolean isExhaustedTuples() {
-		Map<Integer, Boolean> statuses = getFinishedStatuses();
-		for (Integer partition : getQueues().keySet()) {
-			if (!statuses.containsKey(partition)
-					|| !getFinishedStatuses().get(partition)) {
-				return false;
-			}
-		}
-		return true;
-	}
-
-	@Override
-	public IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> getEmitter(
-			Map conf, TopologyContext context) {
-		return new Emitter(conf);
-	}
-
-	@Override
-	public IOpaquePartitionedTransactionalSpout.Coordinator getCoordinator(
-			Map conf, TopologyContext context) {
-		return new Coordinator();
-	}
-
-	class Coordinator implements
-			IOpaquePartitionedTransactionalSpout.Coordinator {
-		@Override
-		public boolean isReady() {
-			return true;
-		}
-
-		@Override
-		public void close() {
-		}
-	}
-
-	class Emitter
-			implements
-			IOpaquePartitionedTransactionalSpout.Emitter<MemoryTransactionalSpoutMeta> {
-
-		Integer _maxSpoutPending;
-		Map<Integer, Integer> _emptyPartitions = new HashMap<Integer, Integer>();
-
-		public Emitter(Map conf) {
-			Object c = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-			if (c == null)
-				_maxSpoutPending = 1;
-			else
-				_maxSpoutPending = Utils.getInt(c);
-		}
-
-		@Override
-		public MemoryTransactionalSpoutMeta emitPartitionBatch(
-				TransactionAttempt tx, BatchOutputCollector collector,
-				int partition, MemoryTransactionalSpoutMeta lastPartitionMeta) {
-			if (!Boolean.FALSE.equals(getDisabledStatuses().get(partition))) {
-				int index;
-				if (lastPartitionMeta == null) {
-					index = 0;
-				} else {
-					index = lastPartitionMeta.index + lastPartitionMeta.amt;
-				}
-				List<List<Object>> queue = getQueues().get(partition);
-				int total = queue.size();
-				int left = total - index;
-				int toTake = Math.min(left, _takeAmt);
-
-				MemoryTransactionalSpoutMeta ret = new MemoryTransactionalSpoutMeta(
-						index, toTake);
-				for (int i = ret.index; i < ret.index + ret.amt; i++) {
-					List<Object> toEmit = new ArrayList<Object>(queue.get(i));
-					toEmit.add(0, tx);
-					collector.emit(toEmit);
-				}
-				if (toTake == 0) {
-					// this is a pretty hacky way to determine when all the
-					// partitions have been committed
-					// wait until we've emitted max-spout-pending empty
-					// partitions for the partition
-					int curr = Utils.get(_emptyPartitions, partition, 0) + 1;
-					_emptyPartitions.put(partition, curr);
-					if (curr > _maxSpoutPending) {
-						getFinishedStatuses().put(partition, true);
-					}
-				}
-				return ret;
-			} else {
-				return null;
-			}
-		}
-
-		@Override
-		public void close() {
-		}
-
-		@Override
-		public int numPartitions() {
-			return getQueues().size();
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		List<String> toDeclare = new ArrayList<String>(_outFields.toList());
-		toDeclare.add(0, TX_FIELD);
-		declarer.declare(new Fields(toDeclare));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		Config conf = new Config();
-		conf.registerSerialization(MemoryTransactionalSpoutMeta.class);
-		return conf;
-	}
-
-	public void startup() {
-		getFinishedStatuses().clear();
-	}
-
-	public void cleanup() {
-		RegisteredGlobalState.clearState(_id);
-		RegisteredGlobalState.clearState(_finishedPartitionsId);
-	}
-
-	private Map<Integer, List<List<Object>>> getQueues() {
-		return (Map<Integer, List<List<Object>>>) RegisteredGlobalState
-				.getState(_id);
-	}
-
-	private Map<Integer, Boolean> getFinishedStatuses() {
-		return (Map<Integer, Boolean>) RegisteredGlobalState
-				.getState(_finishedPartitionsId);
-	}
-
-	private Map<Integer, Boolean> getDisabledStatuses() {
-		return (Map<Integer, Boolean>) RegisteredGlobalState
-				.getState(_disabledId);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
deleted file mode 100644
index 207455b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/PrepareBatchBolt.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.List;
-
-public class PrepareBatchBolt extends BaseBasicBolt {
-	Fields _outFields;
-
-	public PrepareBatchBolt(Fields outFields) {
-		_outFields = outFields;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(_outFields);
-	}
-
-	@Override
-	public void execute(Tuple input, BasicOutputCollector collector) {
-		long id = Utils.secureRandomLong();
-		List<Object> toEmit = new ArrayList<Object>();
-		toEmit.add(id);
-		toEmit.addAll(input.getValues());
-		collector.emit(toEmit);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java b/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java
deleted file mode 100644
index a712ee8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/SpoutTracker.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.spout.ISpoutOutputCollector;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.utils.RegisteredGlobalState;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class SpoutTracker extends BaseRichSpout {
-	IRichSpout _delegate;
-	SpoutTrackOutputCollector _tracker;
-	String _trackId;
-
-	private class SpoutTrackOutputCollector implements ISpoutOutputCollector {
-		public int transferred = 0;
-		public int emitted = 0;
-		public SpoutOutputCollector _collector;
-
-		public SpoutTrackOutputCollector(SpoutOutputCollector collector) {
-			_collector = collector;
-		}
-
-		private void recordSpoutEmit() {
-			Map stats = (Map) RegisteredGlobalState.getState(_trackId);
-			((AtomicInteger) stats.get("spout-emitted")).incrementAndGet();
-
-		}
-
-		public List<Integer> emit(String streamId, List<Object> tuple,
-				Object messageId) {
-			List<Integer> ret = _collector.emit(streamId, tuple, messageId);
-			recordSpoutEmit();
-			return ret;
-		}
-
-		public void emitDirect(int taskId, String streamId, List<Object> tuple,
-				Object messageId) {
-			_collector.emitDirect(taskId, streamId, tuple, messageId);
-			recordSpoutEmit();
-		}
-
-		@Override
-		public void reportError(Throwable error) {
-			_collector.reportError(error);
-		}
-	}
-
-	public SpoutTracker(IRichSpout delegate, String trackId) {
-		_delegate = delegate;
-		_trackId = trackId;
-	}
-
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		_tracker = new SpoutTrackOutputCollector(collector);
-		_delegate.open(conf, context, new SpoutOutputCollector(_tracker));
-	}
-
-	public void close() {
-		_delegate.close();
-	}
-
-	public void nextTuple() {
-		_delegate.nextTuple();
-	}
-
-	public void ack(Object msgId) {
-		_delegate.ack(msgId);
-		Map stats = (Map) RegisteredGlobalState.getState(_trackId);
-		((AtomicInteger) stats.get("processed")).incrementAndGet();
-	}
-
-	public void fail(Object msgId) {
-		_delegate.fail(msgId);
-		Map stats = (Map) RegisteredGlobalState.getState(_trackId);
-		((AtomicInteger) stats.get("processed")).incrementAndGet();
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_delegate.declareOutputFields(declarer);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java b/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
deleted file mode 100644
index 359fa00..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestAggregatesCounter.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import java.util.HashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.tuple;
-
-public class TestAggregatesCounter extends BaseRichBolt {
-	public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
-
-	Map<String, Integer> _counts;
-	OutputCollector _collector;
-
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		_collector = collector;
-		_counts = new HashMap<String, Integer>();
-	}
-
-	public void execute(Tuple input) {
-		String word = (String) input.getValues().get(0);
-		int count = (Integer) input.getValues().get(1);
-		_counts.put(word, count);
-		int globalCount = 0;
-		for (String w : _counts.keySet()) {
-			globalCount += _counts.get(w);
-		}
-		_collector.emit(tuple(globalCount));
-		_collector.ack(input);
-	}
-
-	public void cleanup() {
-
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("agg-global"));
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java
deleted file mode 100644
index c6fe3d6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestConfBolt.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.Map;
-
-public class TestConfBolt extends BaseBasicBolt {
-	Map<String, Object> _componentConf;
-	Map<String, Object> _conf;
-
-	public TestConfBolt() {
-		this(null);
-	}
-
-	public TestConfBolt(Map<String, Object> componentConf) {
-		_componentConf = componentConf;
-	}
-
-	@Override
-	public void prepare(Map conf, TopologyContext context) {
-		_conf = conf;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("conf", "value"));
-	}
-
-	@Override
-	public void execute(Tuple input, BasicOutputCollector collector) {
-		String name = input.getString(0);
-		collector.emit(new Values(name, _conf.get(name)));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return _componentConf;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java b/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java
deleted file mode 100644
index 9f9e421..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestGlobalCount.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestGlobalCount extends BaseRichBolt {
-	public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
-
-	private int _count;
-	OutputCollector _collector;
-
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		_collector = collector;
-		_count = 0;
-	}
-
-	public void execute(Tuple input) {
-		_count++;
-		_collector.emit(input, new Values(_count));
-		_collector.ack(input);
-	}
-
-	public void cleanup() {
-
-	}
-
-	public Fields getOutputFields() {
-		return new Fields("global-count");
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("global-count"));
-	}
-}