You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:06 UTC

[27/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
deleted file mode 100644
index 12d9193..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImplExt;
-
-import com.esotericsoftware.kryo.io.Input;
-
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class KryoTupleDeserializer implements ITupleDeserializer {
-	private static final Logger LOG = Logger.getLogger(KryoTupleDeserializer.class);
-	
-	public static final boolean USE_RAW_PACKET = true;
-
-	GeneralTopologyContext _context;
-	KryoValuesDeserializer _kryo;
-	SerializationFactory.IdDictionary _ids;
-	Input _kryoInput;
-
-	public KryoTupleDeserializer(final Map conf,
-			final GeneralTopologyContext context) {
-		_kryo = new KryoValuesDeserializer(conf);
-		_context = context;
-		_ids = new SerializationFactory.IdDictionary(context.getRawTopology());
-		_kryoInput = new Input(1);
-	}
-
-	public Tuple deserialize(byte[] ser) {
-		
-		int targetTaskId = 0;
-		int taskId = 0;
-		int streamId = 0;
-		String componentName = null;
-		String streamName = null;
-		MessageId id = null;
-		
-		try {
-
-			_kryoInput.setBuffer(ser);
-
-			targetTaskId = _kryoInput.readInt();
-			taskId = _kryoInput.readInt(true);
-			streamId = _kryoInput.readInt(true);
-			componentName = _context.getComponentId(taskId);
-			streamName = _ids.getStreamName(componentName, streamId);
-			id = MessageId.deserialize(_kryoInput);
-			List<Object> values = _kryo.deserializeFrom(_kryoInput);
-			TupleImplExt tuple = new TupleImplExt(_context, values, taskId,
-					streamName, id);
-			tuple.setTargetTaskId(targetTaskId);
-			return tuple;
-		} catch (Throwable e) {
-			StringBuilder sb = new StringBuilder();
-			
-			sb.append("Deserialize error:");
-			sb.append("targetTaskId:").append(targetTaskId);
-			sb.append(",taskId:").append(taskId);
-			sb.append(",streamId:").append(streamId);
-			sb.append(",componentName:").append(componentName);
-			sb.append(",streamName:").append(streamName);
-			sb.append(",MessageId").append(id);
-			
-			LOG.info(sb.toString(), e );
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * just get target taskId
-	 * 
-	 * @param ser
-	 * @return
-	 */
-	public static int deserializeTaskId(byte[] ser) {
-		Input _kryoInput = new Input(1);
-
-		_kryoInput.setBuffer(ser);
-
-		int targetTaskId = _kryoInput.readInt();
-
-		return targetTaskId;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
deleted file mode 100644
index e04b145..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleExt;
-
-import com.esotericsoftware.kryo.io.Output;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class KryoTupleSerializer implements ITupleSerializer {
-	KryoValuesSerializer _kryo;
-	SerializationFactory.IdDictionary _ids;
-	Output _kryoOut;
-
-	public KryoTupleSerializer(final Map conf,
-			final GeneralTopologyContext context) {
-		_kryo = new KryoValuesSerializer(conf);
-		_kryoOut = new Output(2000, 2000000000);
-		_ids = new SerializationFactory.IdDictionary(context.getRawTopology());
-	}
-
-	/**
-	 * @@@ in the furture, it will skill serialize 'targetTask' through check
-	 *     some flag
-	 * @see backtype.storm.serialization.ITupleSerializer#serialize(int,
-	 *      backtype.storm.tuple.Tuple)
-	 */
-	public byte[] serialize(Tuple tuple) {
-		try {
-
-			_kryoOut.clear();
-			if (tuple instanceof TupleExt) {
-				_kryoOut.writeInt(((TupleExt) tuple).getTargetTaskId());
-			}
-
-			_kryoOut.writeInt(tuple.getSourceTask(), true);
-			_kryoOut.writeInt(
-					_ids.getStreamId(tuple.getSourceComponent(),
-							tuple.getSourceStreamId()), true);
-			tuple.getMessageId().serialize(_kryoOut);
-			_kryo.serializeInto(tuple.getValues(), _kryoOut);
-			return _kryoOut.toBytes();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public static byte[] serialize(int targetTask) {
-		ByteBuffer buff = ByteBuffer.allocate((Integer.SIZE / 8));
-		buff.putInt(targetTask);
-		byte[] rtn = buff.array();
-		return rtn;
-	}
-
-	// public long crc32(Tuple tuple) {
-	// try {
-	// CRC32OutputStream hasher = new CRC32OutputStream();
-	// _kryo.serializeInto(tuple.getValues(), hasher);
-	// return hasher.getValue();
-	// } catch (IOException e) {
-	// throw new RuntimeException(e);
-	// }
-	// }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
deleted file mode 100644
index c1f3a80..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package backtype.storm.serialization;
-
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.utils.ListDelegate;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-
-public class KryoValuesDeserializer {
-
-	Kryo _kryo;
-	Input _kryoInput;
-
-	public KryoValuesDeserializer(Map conf) {
-		this._kryo = SerializationFactory.getKryo(conf);
-		this._kryoInput = new Input(1);
-	}
-
-	public List<Object> deserializeFrom(Input input) {
-		ListDelegate delegate = (ListDelegate) _kryo.readObject(input,
-				ListDelegate.class);
-		return delegate.getDelegate();
-	}
-
-	public List<Object> deserialize(byte[] ser) throws IOException {
-		_kryoInput.setBuffer(ser);
-		return deserializeFrom(_kryoInput);
-	}
-
-	public Object deserializeObject(byte[] ser) throws IOException {
-		_kryoInput.setBuffer(ser);
-		return _kryo.readClassAndObject(_kryoInput);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
deleted file mode 100644
index 6072282..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.utils.ListDelegate;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class KryoValuesSerializer {
-	Kryo _kryo;
-	ListDelegate _delegate;
-	Output _kryoOut;
-
-	public KryoValuesSerializer(Map conf) {
-		_kryo = SerializationFactory.getKryo(conf);
-		_delegate = new ListDelegate();
-		_kryoOut = new Output(2000, 2000000000);
-	}
-
-	public void serializeInto(List<Object> values, Output out)
-			throws IOException {
-		// this ensures that list of values is always written the same way,
-		// regardless
-		// of whether it's a java collection or one of clojure's persistent
-		// collections
-		// (which have different serializers)
-		// Doing this lets us deserialize as ArrayList and avoid writing the
-		// class here
-		_delegate.setDelegate(values);
-		_kryo.writeObject(out, _delegate);
-	}
-
-	public byte[] serialize(List<Object> values) throws IOException {
-		_kryoOut.clear();
-		serializeInto(values, _kryoOut);
-		return _kryoOut.toBytes();
-	}
-
-	public byte[] serializeObject(Object obj) {
-		_kryoOut.clear();
-		_kryo.writeClassAndObject(_kryoOut, obj);
-		return _kryoOut.toBytes();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java
deleted file mode 100644
index 4fcaf02..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.serialization;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.commons.io.input.ClassLoaderObjectInputStream;
-
-public class SerializableSerializer extends Serializer<Object> {
-
-	@Override
-	public void write(Kryo kryo, Output output, Object object) {
-		ByteArrayOutputStream bos = new ByteArrayOutputStream();
-		try {
-			ObjectOutputStream oos = new ObjectOutputStream(bos);
-			oos.writeObject(object);
-			oos.flush();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-		byte[] ser = bos.toByteArray();
-		output.writeInt(ser.length);
-		output.writeBytes(ser);
-	}
-
-	@Override
-	public Object read(Kryo kryo, Input input, Class c) {
-		int len = input.readInt();
-		byte[] ser = new byte[len];
-		input.readBytes(ser);
-		ByteArrayInputStream bis = new ByteArrayInputStream(ser);
-		try {
-			ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(
-					kryo.getClassLoader(), bis);
-			return ois.readObject();
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java b/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java
deleted file mode 100644
index 88f7803..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java
+++ /dev/null
@@ -1,242 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.Config;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.serialization.types.ArrayListSerializer;
-import backtype.storm.serialization.types.ListDelegateSerializer;
-import backtype.storm.serialization.types.HashMapSerializer;
-import backtype.storm.serialization.types.HashSetSerializer;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.ListDelegate;
-import backtype.storm.utils.Utils;
-import backtype.storm.utils.WorkerClassLoader;
-import carbonite.JavaBridge;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.serializers.DefaultSerializers.BigIntegerSerializer;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SerializationFactory {
-	public static final Logger LOG = LoggerFactory
-			.getLogger(SerializationFactory.class);
-
-	public static Kryo getKryo(Map conf) {
-		IKryoFactory kryoFactory = (IKryoFactory) Utils
-				.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
-		Kryo k = kryoFactory.getKryo(conf);
-		if (WorkerClassLoader.getInstance() != null)
-			k.setClassLoader(WorkerClassLoader.getInstance());
-		k.register(byte[].class);
-
-		/* tuple payload serializer is specified via configuration */
-		String payloadSerializerName = (String) conf
-				.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
-		try {
-			Class serializerClass = Class.forName(
-					payloadSerializerName, true, k.getClassLoader());
-			Serializer serializer = resolveSerializerInstance(k,
-					ListDelegate.class, serializerClass, conf);
-			k.register(ListDelegate.class, serializer);
-		} catch (ClassNotFoundException ex) {
-			throw new RuntimeException(ex);
-		}
-
-		k.register(ArrayList.class, new ArrayListSerializer());
-		k.register(HashMap.class, new HashMapSerializer());
-		k.register(HashSet.class, new HashSetSerializer());
-		k.register(BigInteger.class, new BigIntegerSerializer());
-		k.register(TransactionAttempt.class);
-		k.register(Values.class);
-		k.register(backtype.storm.metric.api.IMetricsConsumer.DataPoint.class);
-		k.register(backtype.storm.metric.api.IMetricsConsumer.TaskInfo.class);
-		try {
-			JavaBridge.registerPrimitives(k);
-			JavaBridge.registerCollections(k);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-
-		Map<String, String> registrations = normalizeKryoRegister(conf);
-
-		kryoFactory.preRegister(k, conf);
-
-		boolean skipMissing = (Boolean) conf
-				.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);
-		for (String klassName : registrations.keySet()) {
-			String serializerClassName = registrations.get(klassName);
-			try {
-				Class klass = Class.forName(
-						klassName, true, k.getClassLoader());
-						
-				Class serializerClass = null;
-				if (serializerClassName != null)
-					serializerClass = Class.forName(
-							serializerClassName, true, k.getClassLoader());
-				if (serializerClass == null) {
-					k.register(klass);
-				} else {
-					k.register(
-							klass,
-							resolveSerializerInstance(k, klass,
-									serializerClass, conf));
-				}
-			} catch (ClassNotFoundException e) {
-				if (skipMissing) {
-					LOG.info("Could not find serialization or class for "
-							+ serializerClassName
-							+ ". Skipping registration...");
-				} else {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-
-		kryoFactory.postRegister(k, conf);
-
-		if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
-			for (String klassName : (List<String>) conf
-					.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
-				try {
-					Class klass = Class.forName(
-							klassName, true, k.getClassLoader());
-					IKryoDecorator decorator = (IKryoDecorator) klass
-							.newInstance();
-					decorator.decorate(k);
-				} catch (ClassNotFoundException e) {
-					if (skipMissing) {
-						LOG.info("Could not find kryo decorator named "
-								+ klassName + ". Skipping registration...");
-					} else {
-						throw new RuntimeException(e);
-					}
-				} catch (InstantiationException e) {
-					throw new RuntimeException(e);
-				} catch (IllegalAccessException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-
-		kryoFactory.postDecorate(k, conf);
-
-		return k;
-	}
-
-	public static class IdDictionary {
-		Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>();
-		Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>();
-
-		public IdDictionary(StormTopology topology) {
-			List<String> componentNames = new ArrayList<String>(topology
-					.get_spouts().keySet());
-			componentNames.addAll(topology.get_bolts().keySet());
-			componentNames.addAll(topology.get_state_spouts().keySet());
-
-			for (String name : componentNames) {
-				ComponentCommon common = Utils.getComponentCommon(topology,
-						name);
-				List<String> streams = new ArrayList<String>(common
-						.get_streams().keySet());
-				streamNametoId.put(name, idify(streams));
-				streamIdToName.put(name,
-						Utils.reverseMap(streamNametoId.get(name)));
-			}
-		}
-
-		public int getStreamId(String component, String stream) {
-			return streamNametoId.get(component).get(stream);
-		}
-
-		public String getStreamName(String component, int stream) {
-			return streamIdToName.get(component).get(stream);
-		}
-
-		private static Map<String, Integer> idify(List<String> names) {
-			Collections.sort(names);
-			Map<String, Integer> ret = new HashMap<String, Integer>();
-			int i = 1;
-			for (String name : names) {
-				ret.put(name, i);
-				i++;
-			}
-			return ret;
-		}
-	}
-
-	private static Serializer resolveSerializerInstance(Kryo k,
-			Class superClass, Class<? extends Serializer> serializerClass,
-			Map conf) {
-		try {
-			try {
-				return serializerClass.getConstructor(Kryo.class, Class.class,
-						Map.class).newInstance(k, superClass, conf);
-			} catch (Exception ex1) {
-				try {
-					return serializerClass.getConstructor(Kryo.class,
-							Class.class).newInstance(k, superClass);
-				} catch (Exception ex2) {
-					try {
-						return serializerClass.getConstructor(Kryo.class,
-								Map.class).newInstance(k, conf);
-					} catch (Exception ex3) {
-						try {
-							return serializerClass.getConstructor(Kryo.class)
-									.newInstance(k);
-						} catch (Exception ex4) {
-							try {
-								return serializerClass.getConstructor(
-										Class.class, Map.class).newInstance(
-										superClass, conf);
-							} catch (Exception ex5) {
-								try {
-									return serializerClass.getConstructor(
-											Class.class)
-											.newInstance(superClass);
-								} catch (Exception ex6) {
-									return serializerClass.newInstance();
-								}
-							}
-						}
-					}
-				}
-			}
-		} catch (Exception ex) {
-			throw new IllegalArgumentException("Unable to create serializer \""
-					+ serializerClass.getName() + "\" for class: "
-					+ superClass.getName(), ex);
-		}
-	}
-
-	private static Map<String, String> normalizeKryoRegister(Map conf) {
-		// TODO: de-duplicate this logic with the code in nimbus
-		Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER);
-		if (res == null)
-			return new TreeMap<String, String>();
-		Map<String, String> ret = new HashMap<String, String>();
-		if (res instanceof Map) {
-			ret = (Map<String, String>) res;
-		} else {
-			for (Object o : (List) res) {
-				if (o instanceof Map) {
-					ret.putAll((Map) o);
-				} else {
-					ret.put((String) o, null);
-				}
-			}
-		}
-
-		// ensure always same order for registrations with TreeMap
-		return new TreeMap<String, String>(ret);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
deleted file mode 100644
index e403a95..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import java.util.ArrayList;
-import java.util.Collection;
-
-public class ArrayListSerializer extends CollectionSerializer {
-	@Override
-	public Collection create(Kryo kryo, Input input, Class<Collection> type) {
-		return new ArrayList();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
deleted file mode 100644
index c1f7456..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.MapSerializer;
-import java.util.HashMap;
-import java.util.Map;
-
-public class HashMapSerializer extends MapSerializer {
-	@Override
-	public Map create(Kryo kryo, Input input, Class<Map> type) {
-		return new HashMap();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
deleted file mode 100644
index b28bbd6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import java.util.Collection;
-import java.util.HashSet;
-
-public class HashSetSerializer extends CollectionSerializer {
-	@Override
-	public Collection create(Kryo kryo, Input input, Class<Collection> type) {
-		return new HashSet();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
deleted file mode 100644
index 67242a2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import backtype.storm.utils.ListDelegate;
-import java.util.Collection;
-
-public class ListDelegateSerializer extends CollectionSerializer {
-	@Override
-	public Collection create(Kryo kryo, Input input, Class<Collection> type) {
-		return new ListDelegate();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java b/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
deleted file mode 100644
index ba31324..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.spout;
-
-public interface IMultiSchemableSpout {
-	MultiScheme getScheme();
-
-	void setScheme(MultiScheme scheme);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java b/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java
deleted file mode 100644
index 5bbc869..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.spout;
-
-public interface ISchemableSpout {
-	Scheme getScheme();
-
-	void setScheme(Scheme scheme);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java b/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java
deleted file mode 100644
index 67f94f3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-import java.io.Serializable;
-
-/**
- * ISpout is the core interface for implementing spouts. A Spout is responsible
- * for feeding messages into the topology for processing. For every tuple
- * emitted by a spout, Storm will track the (potentially very large) DAG of
- * tuples generated based on a tuple emitted by the spout. When Storm detects
- * that every tuple in that DAG has been successfully processed, it will send an
- * ack message to the Spout.
- * 
- * <p>
- * If a tuple fails to be fully process within the configured timeout for the
- * topology (see {@link backtype.storm.Config}), Storm will send a fail message
- * to the spout for the message.
- * </p>
- * 
- * <p>
- * When a Spout emits a tuple, it can tag the tuple with a message id. The
- * message id can be any type. When Storm acks or fails a message, it will pass
- * back to the spout the same message id to identify which tuple it's referring
- * to. If the spout leaves out the message id, or sets it to null, then Storm
- * will not track the message and the spout will not receive any ack or fail
- * callbacks for the message.
- * </p>
- * 
- * <p>
- * Storm executes ack, fail, and nextTuple all on the same thread. This means
- * that an implementor of an ISpout does not need to worry about concurrency
- * issues between those methods. However, it also means that an implementor must
- * ensure that nextTuple is non-blocking: otherwise the method could block acks
- * and fails that are pending to be processed.
- * </p>
- */
-public interface ISpout extends Serializable {
-	/**
-	 * Called when a task for this component is initialized within a worker on
-	 * the cluster. It provides the spout with the environment in which the
-	 * spout executes.
-	 * 
-	 * <p>
-	 * This includes the:
-	 * </p>
-	 * 
-	 * @param conf
-	 *            The Storm configuration for this spout. This is the
-	 *            configuration provided to the topology merged in with cluster
-	 *            configuration on this machine.
-	 * @param context
-	 *            This object can be used to get information about this task's
-	 *            place within the topology, including the task id and component
-	 *            id of this task, input and output information, etc.
-	 * @param collector
-	 *            The collector is used to emit tuples from this spout. Tuples
-	 *            can be emitted at any time, including the open and close
-	 *            methods. The collector is thread-safe and should be saved as
-	 *            an instance variable of this spout object.
-	 */
-	void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
-
-	/**
-	 * Called when an ISpout is going to be shutdown. There is no guarentee that
-	 * close will be called, because the supervisor kill -9's worker processes
-	 * on the cluster.
-	 * 
-	 * <p>
-	 * The one context where close is guaranteed to be called is a topology is
-	 * killed when running Storm in local mode.
-	 * </p>
-	 */
-	void close();
-
-	/**
-	 * Called when a spout has been activated out of a deactivated mode.
-	 * nextTuple will be called on this spout soon. A spout can become activated
-	 * after having been deactivated when the topology is manipulated using the
-	 * `storm` client.
-	 */
-	void activate();
-
-	/**
-	 * Called when a spout has been deactivated. nextTuple will not be called
-	 * while a spout is deactivated. The spout may or may not be reactivated in
-	 * the future.
-	 */
-	void deactivate();
-
-	/**
-	 * When this method is called, Storm is requesting that the Spout emit
-	 * tuples to the output collector. This method should be non-blocking, so if
-	 * the Spout has no tuples to emit, this method should return. nextTuple,
-	 * ack, and fail are all called in a tight loop in a single thread in the
-	 * spout task. When there are no tuples to emit, it is courteous to have
-	 * nextTuple sleep for a short amount of time (like a single millisecond) so
-	 * as not to waste too much CPU.
-	 */
-	void nextTuple();
-
-	/**
-	 * Storm has determined that the tuple emitted by this spout with the msgId
-	 * identifier has been fully processed. Typically, an implementation of this
-	 * method will take that message off the queue and prevent it from being
-	 * replayed.
-	 */
-	void ack(Object msgId);
-
-	/**
-	 * The tuple emitted by this spout with the msgId identifier has failed to
-	 * be fully processed. Typically, an implementation of this method will put
-	 * that message back on the queue to be replayed at a later time.
-	 */
-	void fail(Object msgId);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java
deleted file mode 100644
index 6b66b00..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.List;
-
-public interface ISpoutOutputCollector {
-	/**
-	 * Returns the task ids that received the tuples.
-	 */
-	List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
-
-	void emitDirect(int taskId, String streamId, List<Object> tuple,
-			Object messageId);
-
-	void reportError(Throwable error);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java b/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java
deleted file mode 100644
index f5a25b3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.Map;
-
-/**
- * The strategy a spout needs to use when its waiting. Waiting is triggered in
- * one of two conditions:
- * 
- * 1. nextTuple emits no tuples 2. The spout has hit maxSpoutPending and can't
- * emit any more tuples
- * 
- * The default strategy sleeps for one millisecond.
- */
-public interface ISpoutWaitStrategy {
-	void prepare(Map conf);
-
-	void emptyEmit(long streak);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java
deleted file mode 100644
index e67d036..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.List;
-import java.io.Serializable;
-
-import backtype.storm.tuple.Fields;
-
-public interface MultiScheme extends Serializable {
-	public Iterable<List<Object>> deserialize(byte[] ser);
-
-	public Fields getOutputFields();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java b/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java
deleted file mode 100644
index c084b10..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.Map;
-
-public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy {
-	@Override
-	public void emptyEmit(long streak) {
-	}
-
-	@Override
-	public void prepare(Map conf) {
-		throw new UnsupportedOperationException("Not supported yet.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java
deleted file mode 100644
index 2446b45..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.List;
-
-import backtype.storm.tuple.Fields;
-
-import static backtype.storm.utils.Utils.tuple;
-import static java.util.Arrays.asList;
-
-public class RawMultiScheme implements MultiScheme {
-	@Override
-	public Iterable<List<Object>> deserialize(byte[] ser) {
-		return asList(tuple(ser));
-	}
-
-	@Override
-	public Fields getOutputFields() {
-		return new Fields("bytes");
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java
deleted file mode 100644
index 46e9d1c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import static backtype.storm.utils.Utils.tuple;
-
-public class RawScheme implements Scheme {
-	public List<Object> deserialize(byte[] ser) {
-		return tuple(ser);
-	}
-
-	public Fields getOutputFields() {
-		return new Fields("bytes");
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java b/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java
deleted file mode 100644
index 26bf3ae..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.tuple.Fields;
-import java.io.Serializable;
-import java.util.List;
-
-public interface Scheme extends Serializable {
-	public List<Object> deserialize(byte[] ser);
-
-	public Fields getOutputFields();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java
deleted file mode 100644
index cc80ef9..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.Arrays;
-import java.util.List;
-
-import backtype.storm.tuple.Fields;
-
-public class SchemeAsMultiScheme implements MultiScheme {
-	public final Scheme scheme;
-
-	public SchemeAsMultiScheme(Scheme scheme) {
-		this.scheme = scheme;
-	}
-
-	@Override
-	public Iterable<List<Object>> deserialize(final byte[] ser) {
-		List<Object> o = scheme.deserialize(ser);
-		if (o == null)
-			return null;
-		else
-			return Arrays.asList(o);
-	}
-
-	@Override
-	public Fields getOutputFields() {
-		return scheme.getOutputFields();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java b/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java
deleted file mode 100644
index a8e18dc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.spout;
-
-import backtype.storm.Config;
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.rpc.IShellMetric;
-import backtype.storm.multilang.ShellMsg;
-import backtype.storm.multilang.SpoutMsg;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.ShellProcess;
-import java.util.Map;
-import java.util.List;
-import java.util.TimerTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import clojure.lang.RT;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ShellSpout implements ISpout {
-    public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
-
-    private SpoutOutputCollector _collector;
-    private String[] _command;
-    private ShellProcess _process;
-    
-    private TopologyContext _context;
-    
-    private SpoutMsg _spoutMsg;
-
-    private int workerTimeoutMills;
-    private ScheduledExecutorService heartBeatExecutorService;
-    private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
-
-    public ShellSpout(ShellComponent component) {
-        this(component.get_execution_command(), component.get_script());
-    }
-
-    public ShellSpout(String... command) {
-        _command = command;
-    }
-
-    public void open(Map stormConf, TopologyContext context,
-                     SpoutOutputCollector collector) {
-        _collector = collector;
-        _context = context;
-
-        workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
-
-        _process = new ShellProcess(_command);
-
-        Number subpid = _process.launch(stormConf, context);
-        LOG.info("Launched subprocess with pid " + subpid);
-
-        heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
-    }
-
-    public void close() {
-        heartBeatExecutorService.shutdownNow();
-        _process.destroy();
-    }
-
-    public void nextTuple() {
-        if (_spoutMsg == null) {
-            _spoutMsg = new SpoutMsg();
-        }
-        _spoutMsg.setCommand("next");
-        _spoutMsg.setId("");
-        querySubprocess();
-    }
-
-    public void ack(Object msgId) {
-        if (_spoutMsg == null) {
-            _spoutMsg = new SpoutMsg();
-        }
-        _spoutMsg.setCommand("ack");
-        _spoutMsg.setId(msgId);
-        querySubprocess();
-    }
-
-    public void fail(Object msgId) {
-        if (_spoutMsg == null) {
-            _spoutMsg = new SpoutMsg();
-        }
-        _spoutMsg.setCommand("fail");
-        _spoutMsg.setId(msgId);
-        querySubprocess();
-    }
-    
-    private void handleMetrics(ShellMsg shellMsg) {
-        //get metric name
-        String name = shellMsg.getMetricName();
-        if (name.isEmpty()) {
-            throw new RuntimeException("Receive Metrics name is empty");
-        }
-        
-        //get metric by name
-        IMetric iMetric = _context.getRegisteredMetricByName(name);
-        if (iMetric == null) {
-            throw new RuntimeException("Could not find metric by name["+name+"] ");
-        }
-        if ( !(iMetric instanceof IShellMetric)) {
-            throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
-        }
-        IShellMetric iShellMetric = (IShellMetric)iMetric;
-        
-        //call updateMetricFromRPC with params
-        Object paramsObj = shellMsg.getMetricParams();
-        try {
-            iShellMetric.updateMetricFromRPC(paramsObj);
-        } catch (RuntimeException re) {
-            throw re;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }       
-    }
-
-    private void querySubprocess() {
-        try {
-            _process.writeSpoutMsg(_spoutMsg);
-
-            while (true) {
-                ShellMsg shellMsg = _process.readShellMsg();
-                String command = shellMsg.getCommand();
-                if (command == null) {
-                    throw new IllegalArgumentException("Command not found in spout message: " + shellMsg);
-                }
-
-                setHeartbeat();
-
-                if (command.equals("sync")) {
-                    return;
-                } else if (command.equals("log")) {
-                    handleLog(shellMsg);
-                } else if (command.equals("emit")) {
-                    String stream = shellMsg.getStream();
-                    Long task = shellMsg.getTask();
-                    List<Object> tuple = shellMsg.getTuple();
-                    Object messageId = shellMsg.getId();
-                    if (task == 0) {
-                        List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
-                        if (shellMsg.areTaskIdsNeeded()) {
-                            _process.writeTaskIds(outtasks);
-                        }
-                    } else {
-                        _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
-                    }
-                } else if (command.equals("metrics")) {
-                    handleMetrics(shellMsg);
-                } else {
-                    throw new RuntimeException("Unknown command received: " + command);
-                }
-            }
-        } catch (Exception e) {
-            String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
-            throw new RuntimeException(processInfo, e);
-        }
-    }
-
-    private void handleLog(ShellMsg shellMsg) {
-        String msg = shellMsg.getMsg();
-        msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
-        ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
-
-        switch (logLevel) {
-            case TRACE:
-                LOG.trace(msg);
-                break;
-            case DEBUG:
-                LOG.debug(msg);
-                break;
-            case INFO:
-                LOG.info(msg);
-                break;
-            case WARN:
-                LOG.warn(msg);
-                break;
-            case ERROR:
-                LOG.error(msg);
-                break;
-            default:
-                LOG.info(msg);
-                break;
-        }
-    }
-
-    @Override
-    public void activate() {
-        LOG.info("Start checking heartbeat...");
-        // prevent timer to check heartbeat based on last thing before activate
-        setHeartbeat();
-        heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
-    }
-
-    @Override
-    public void deactivate() {
-        heartBeatExecutorService.shutdownNow();
-    }
-
-    private void setHeartbeat() {
-        lastHeartbeatTimestamp.set(System.currentTimeMillis());
-    }
-
-    private long getLastHeartbeat() {
-        return lastHeartbeatTimestamp.get();
-    }
-
-    private void die(Throwable exception) {
-        heartBeatExecutorService.shutdownNow();
-
-        LOG.error("Halting process: ShellSpout died.", exception);
-        _collector.reportError(exception);
-        _process.destroy();
-        System.exit(11);
-    }
-
-    private class SpoutHeartbeatTimerTask extends TimerTask {
-        private ShellSpout spout;
-
-        public SpoutHeartbeatTimerTask(ShellSpout spout) {
-            this.spout = spout;
-        }
-
-        @Override
-        public void run() {
-            long currentTimeMillis = System.currentTimeMillis();
-            long lastHeartbeat = getLastHeartbeat();
-
-            LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
-                    currentTimeMillis, lastHeartbeat, workerTimeoutMills);
-
-            if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
-                spout.die(new RuntimeException("subprocess heartbeat timeout"));
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java b/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java
deleted file mode 100644
index 0aa7f64..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.Config;
-import java.util.Map;
-
-public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy {
-
-	long sleepMillis;
-
-	@Override
-	public void prepare(Map conf) {
-		sleepMillis = ((Number) conf
-				.get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS))
-				.longValue();
-	}
-
-	@Override
-	public void emptyEmit(long streak) {
-		try {
-			Thread.sleep(sleepMillis);
-		} catch (InterruptedException e) {
-			throw new RuntimeException(e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java
deleted file mode 100644
index 069fb99..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-/**
- * This output collector exposes the API for emitting tuples from an
- * {@link backtype.storm.topology.IRichSpout}. The main difference between this
- * output collector and {@link OutputCollector} for
- * {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages
- * with ids so that they can be acked or failed later on. This is the Spout
- * portion of Storm's API to guarantee that each message is fully processed at
- * least once.
- */
-public class SpoutOutputCollector implements ISpoutOutputCollector {
-	ISpoutOutputCollector _delegate;
-
-	public SpoutOutputCollector(ISpoutOutputCollector delegate) {
-		_delegate = delegate;
-	}
-
-	/**
-	 * Emits a new tuple to the specified output stream with the given message
-	 * ID. When Storm detects that this tuple has been fully processed, or has
-	 * failed to be fully processed, the spout will receive an ack or fail
-	 * callback respectively with the messageId as long as the messageId was not
-	 * null. If the messageId was null, Storm will not track the tuple and no
-	 * callback will be received. The emitted values must be immutable.
-	 * 
-	 * @return the list of task ids that this tuple was sent to
-	 */
-	public List<Integer> emit(String streamId, List<Object> tuple,
-			Object messageId) {
-		return _delegate.emit(streamId, tuple, messageId);
-	}
-
-	/**
-	 * Emits a new tuple to the default output stream with the given message ID.
-	 * When Storm detects that this tuple has been fully processed, or has
-	 * failed to be fully processed, the spout will receive an ack or fail
-	 * callback respectively with the messageId as long as the messageId was not
-	 * null. If the messageId was null, Storm will not track the tuple and no
-	 * callback will be received. The emitted values must be immutable.
-	 * 
-	 * @return the list of task ids that this tuple was sent to
-	 */
-	public List<Integer> emit(List<Object> tuple, Object messageId) {
-		return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
-	}
-
-	/**
-	 * Emits a tuple to the default output stream with a null message id. Storm
-	 * will not track this message so ack and fail will never be called for this
-	 * tuple. The emitted values must be immutable.
-	 */
-	public List<Integer> emit(List<Object> tuple) {
-		return emit(tuple, null);
-	}
-
-	/**
-	 * Emits a tuple to the specified output stream with a null message id.
-	 * Storm will not track this message so ack and fail will never be called
-	 * for this tuple. The emitted values must be immutable.
-	 */
-	public List<Integer> emit(String streamId, List<Object> tuple) {
-		return emit(streamId, tuple, null);
-	}
-
-	/**
-	 * Emits a tuple to the specified task on the specified output stream. This
-	 * output stream must have been declared as a direct stream, and the
-	 * specified task must use a direct grouping on this stream to receive the
-	 * message. The emitted values must be immutable.
-	 */
-	public void emitDirect(int taskId, String streamId, List<Object> tuple,
-			Object messageId) {
-		_delegate.emitDirect(taskId, streamId, tuple, messageId);
-	}
-
-	/**
-	 * Emits a tuple to the specified task on the default output stream. This
-	 * output stream must have been declared as a direct stream, and the
-	 * specified task must use a direct grouping on this stream to receive the
-	 * message. The emitted values must be immutable.
-	 */
-	public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
-		emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
-	}
-
-	/**
-	 * Emits a tuple to the specified task on the specified output stream. This
-	 * output stream must have been declared as a direct stream, and the
-	 * specified task must use a direct grouping on this stream to receive the
-	 * message. The emitted values must be immutable.
-	 * 
-	 * <p>
-	 * Because no message id is specified, Storm will not track this message so
-	 * ack and fail will never be called for this tuple.
-	 * </p>
-	 */
-	public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-		emitDirect(taskId, streamId, tuple, null);
-	}
-
-	/**
-	 * Emits a tuple to the specified task on the default output stream. This
-	 * output stream must have been declared as a direct stream, and the
-	 * specified task must use a direct grouping on this stream to receive the
-	 * message. The emitted values must be immutable.
-	 * 
-	 * <p>
-	 * Because no message id is specified, Storm will not track this message so
-	 * ack and fail will never be called for this tuple.
-	 * </p>
-	 */
-	public void emitDirect(int taskId, List<Object> tuple) {
-		emitDirect(taskId, tuple, null);
-	}
-
-	@Override
-	public void reportError(Throwable error) {
-		_delegate.reportError(error);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java b/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java
deleted file mode 100644
index 2c8b300..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.state;
-
-import backtype.storm.task.TopologyContext;
-import java.io.Serializable;
-import java.util.Map;
-
-public interface IStateSpout extends Serializable {
-	void open(Map conf, TopologyContext context);
-
-	void close();
-
-	void nextTuple(StateSpoutOutputCollector collector);
-
-	void synchronize(SynchronizeOutputCollector collector);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java
deleted file mode 100644
index d26ed6b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.state;
-
-public interface IStateSpoutOutputCollector extends ISynchronizeOutputCollector {
-	void remove(int streamId, Object id);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java b/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java
deleted file mode 100644
index 4256a0a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.state;
-
-import backtype.storm.tuple.Tuple;
-
-public interface ISubscribedState {
-	void set(Object id, Tuple tuple);
-
-	void remove(Object id);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java
deleted file mode 100644
index 97a8a8e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.state;
-
-import java.util.List;
-
-public interface ISynchronizeOutputCollector {
-	void add(int streamId, Object id, List<Object> tuple);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java
deleted file mode 100644
index 3156e95..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.state;
-
-public class StateSpoutOutputCollector extends SynchronizeOutputCollector
-		implements IStateSpoutOutputCollector {
-
-	@Override
-	public void remove(int streamId, Object id) {
-		throw new UnsupportedOperationException("Not supported yet.");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java
deleted file mode 100644
index 9474fa2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.state;
-
-import java.util.List;
-
-public class SynchronizeOutputCollector implements ISynchronizeOutputCollector {
-
-	@Override
-	public void add(int streamId, Object id, List<Object> tuple) {
-		throw new UnsupportedOperationException("Not supported yet.");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java b/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java
deleted file mode 100644
index 4817fb4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package backtype.storm.task;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.json.simple.JSONAware;
-
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.ThriftTopologyUtils;
-import backtype.storm.utils.Utils;
-
-public class GeneralTopologyContext implements JSONAware {
-	private StormTopology _topology;
-	private Map<Integer, String> _taskToComponent;
-	private Map<String, List<Integer>> _componentToTasks;
-	private Map<String, Map<String, Fields>> _componentToStreamToFields;
-	private String _topologyId;
-	protected Map _stormConf;
-
-	// pass in componentToSortedTasks for the case of running tons of tasks in
-	// single executor
-	public GeneralTopologyContext(StormTopology topology, Map stormConf,
-			Map<Integer, String> taskToComponent,
-			Map<String, List<Integer>> componentToSortedTasks,
-			Map<String, Map<String, Fields>> componentToStreamToFields,
-			String topologyId) {
-		_topology = topology;
-		_stormConf = stormConf;
-		_taskToComponent = taskToComponent;
-		_topologyId = topologyId;
-		_componentToTasks = componentToSortedTasks;
-		_componentToStreamToFields = componentToStreamToFields;
-	}
-
-	/**
-	 * Gets the unique id assigned to this topology. The id is the storm name
-	 * with a unique nonce appended to it.
-	 * 
-	 * @return the topology id
-	 */
-	public String getTopologyId() {
-		return _topologyId;
-	}
-	
-	/**
-	 * Please use the getTopologId() instead.
-	 * 
-	 * @return the topology id
-	 */
-	@Deprecated
-	public String getStormId() {
-		return _topologyId;
-	}
-
-	/**
-	 * Gets the Thrift object representing the topology.
-	 * 
-	 * @return the Thrift definition representing the topology
-	 */
-	public StormTopology getRawTopology() {
-		return _topology;
-	}
-
-	/**
-	 * Gets the component id for the specified task id. The component id maps to
-	 * a component id specified for a Spout or Bolt in the topology definition.
-	 * 
-	 * @param taskId
-	 *            the task id
-	 * @return the component id for the input task id
-	 */
-	public String getComponentId(int taskId) {
-		if (taskId == Constants.SYSTEM_TASK_ID) {
-			return Constants.SYSTEM_COMPONENT_ID;
-		} else {
-			return _taskToComponent.get(taskId);
-		}
-	}
-
-	/**
-	 * Gets the set of streams declared for the specified component.
-	 */
-	public Set<String> getComponentStreams(String componentId) {
-		return getComponentCommon(componentId).get_streams().keySet();
-	}
-
-	/**
-	 * Gets the task ids allocated for the given component id. The task ids are
-	 * always returned in ascending order.
-	 */
-	public List<Integer> getComponentTasks(String componentId) {
-		List<Integer> ret = _componentToTasks.get(componentId);
-		if (ret == null)
-			return new ArrayList<Integer>();
-		else
-			return new ArrayList<Integer>(ret);
-	}
-
-	/**
-	 * Gets the declared output fields for the specified component/stream.
-	 */
-	public Fields getComponentOutputFields(String componentId, String streamId) {
-		Fields ret = _componentToStreamToFields.get(componentId).get(streamId);
-		if (ret == null) {
-			throw new IllegalArgumentException(
-					"No output fields defined for component:stream "
-							+ componentId + ":" + streamId);
-		}
-		return ret;
-	}
-
-	/**
-	 * Gets the declared output fields for the specified global stream id.
-	 */
-	public Fields getComponentOutputFields(GlobalStreamId id) {
-		return getComponentOutputFields(id.get_componentId(), id.get_streamId());
-	}
-
-	/**
-	 * Gets the declared inputs to the specified component.
-	 * 
-	 * @return A map from subscribed component/stream to the grouping subscribed
-	 *         with.
-	 */
-	public Map<GlobalStreamId, Grouping> getSources(String componentId) {
-		return getComponentCommon(componentId).get_inputs();
-	}
-
-	/**
-	 * Gets information about who is consuming the outputs of the specified
-	 * component, and how.
-	 * 
-	 * @return Map from stream id to component id to the Grouping used.
-	 */
-	public Map<String, Map<String, Grouping>> getTargets(String componentId) {
-		Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
-		for (String otherComponentId : getComponentIds()) {
-			Map<GlobalStreamId, Grouping> inputs = getComponentCommon(
-					otherComponentId).get_inputs();
-			for (GlobalStreamId id : inputs.keySet()) {
-				if (id.get_componentId().equals(componentId)) {
-					Map<String, Grouping> curr = ret.get(id.get_streamId());
-					if (curr == null)
-						curr = new HashMap<String, Grouping>();
-					curr.put(otherComponentId, inputs.get(id));
-					ret.put(id.get_streamId(), curr);
-				}
-			}
-		}
-		return ret;
-	}
-
-	@Override
-	public String toJSONString() {
-		Map obj = new HashMap();
-		obj.put("task->component", _taskToComponent);
-		// TODO: jsonify StormTopology
-		// at the minimum should send source info
-		return Utils.to_json(obj);
-	}
-
-	/**
-	 * Gets a map from task id to component id.
-	 */
-	public Map<Integer, String> getTaskToComponent() {
-		return _taskToComponent;
-	}
-
-	/**
-	 * Gets a list of all component ids in this topology
-	 */
-	public Set<String> getComponentIds() {
-		return ThriftTopologyUtils.getComponentIds(getRawTopology());
-	}
-
-	public ComponentCommon getComponentCommon(String componentId) {
-		return ThriftTopologyUtils.getComponentCommon(getRawTopology(),
-				componentId);
-	}
-
-	public int maxTopologyMessageTimeout() {
-		Integer max = Utils.getInt(_stormConf
-				.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
-		for (String spout : getRawTopology().get_spouts().keySet()) {
-			ComponentCommon common = getComponentCommon(spout);
-			String jsonConf = common.get_json_conf();
-			if (jsonConf != null) {
-				Map conf = (Map) Utils.from_json(jsonConf);
-				Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
-				if (comp != null) {
-					max = Math.max(Utils.getInt(comp), max);
-				}
-			}
-		}
-		return max;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IBolt.java b/jstorm-client/src/main/java/backtype/storm/task/IBolt.java
deleted file mode 100644
index bfffa14..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IBolt.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-import java.io.Serializable;
-
-/**
- * An IBolt represents a component that takes tuples as input and produces
- * tuples as output. An IBolt can do everything from filtering to joining to
- * functions to aggregations. It does not have to process a tuple immediately
- * and may hold onto tuples to process later.
- * 
- * <p>
- * A bolt's lifecycle is as follows:
- * </p>
- * 
- * <p>
- * IBolt object created on client machine. The IBolt is serialized into the
- * topology (using Java serialization) and submitted to the master machine of
- * the cluster (Nimbus). Nimbus then launches workers which deserialize the
- * object, call prepare on it, and then start processing tuples.
- * </p>
- * 
- * <p>
- * If you want to parameterize an IBolt, you should set the parameter's through
- * its constructor and save the parameterization state as instance variables
- * (which will then get serialized and shipped to every task executing this bolt
- * across the cluster).
- * </p>
- * 
- * <p>
- * When defining bolts in Java, you should use the IRichBolt interface which
- * adds necessary methods for using the Java TopologyBuilder API.
- * </p>
- */
-public interface IBolt extends Serializable {
-	/**
-	 * Called when a task for this component is initialized within a worker on
-	 * the cluster. It provides the bolt with the environment in which the bolt
-	 * executes.
-	 * 
-	 * <p>
-	 * This includes the:
-	 * </p>
-	 * 
-	 * @param stormConf
-	 *            The Storm configuration for this bolt. This is the
-	 *            configuration provided to the topology merged in with cluster
-	 *            configuration on this machine.
-	 * @param context
-	 *            This object can be used to get information about this task's
-	 *            place within the topology, including the task id and component
-	 *            id of this task, input and output information, etc.
-	 * @param collector
-	 *            The collector is used to emit tuples from this bolt. Tuples
-	 *            can be emitted at any time, including the prepare and cleanup
-	 *            methods. The collector is thread-safe and should be saved as
-	 *            an instance variable of this bolt object.
-	 */
-	void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector);
-
-	/**
-	 * Process a single tuple of input. The Tuple object contains metadata on it
-	 * about which component/stream/task it came from. The values of the Tuple
-	 * can be accessed using Tuple#getValue. The IBolt does not have to process
-	 * the Tuple immediately. It is perfectly fine to hang onto a tuple and
-	 * process it later (for instance, to do an aggregation or join).
-	 * 
-	 * <p>
-	 * Tuples should be emitted using the OutputCollector provided through the
-	 * prepare method. It is required that all input tuples are acked or failed
-	 * at some point using the OutputCollector. Otherwise, Storm will be unable
-	 * to determine when tuples coming off the spouts have been completed.
-	 * </p>
-	 * 
-	 * <p>
-	 * For the common case of acking an input tuple at the end of the execute
-	 * method, see IBasicBolt which automates this.
-	 * </p>
-	 * 
-	 * @param input
-	 *            The input tuple to be processed.
-	 */
-	void execute(Tuple input);
-
-	/**
-	 * Called when an IBolt is going to be shutdown. There is no guarentee that
-	 * cleanup will be called, because the supervisor kill -9's worker processes
-	 * on the cluster.
-	 * 
-	 * <p>
-	 * The one context where cleanup is guaranteed to be called is when a
-	 * topology is killed when running Storm in local mode.
-	 * </p>
-	 */
-	void cleanup();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java b/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java
deleted file mode 100644
index ae04710..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.task;
-
-public interface IErrorReporter {
-	void reportError(Throwable error);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java b/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java
deleted file mode 100644
index d4ace69..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-
-public interface IMetricsContext {
-	<T extends IMetric> T registerMetric(String name, T metric,
-			int timeBucketSizeInSecs);
-
-	ReducedMetric registerMetric(String name, IReducer reducer,
-			int timeBucketSizeInSecs);
-
-	CombinedMetric registerMetric(String name, ICombiner combiner,
-			int timeBucketSizeInSecs);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java
deleted file mode 100644
index dcf2217..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.tuple.Tuple;
-import java.util.Collection;
-import java.util.List;
-
-public interface IOutputCollector extends IErrorReporter {
-	/**
-	 * Returns the task ids that received the tuples.
-	 */
-	List<Integer> emit(String streamId, Collection<Tuple> anchors,
-			List<Object> tuple);
-
-	void emitDirect(int taskId, String streamId, Collection<Tuple> anchors,
-			List<Object> tuple);
-
-	void ack(Tuple input);
-
-	void fail(Tuple input);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java b/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java
deleted file mode 100644
index 9cee3a8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This output collector exposes the API for emitting tuples from an IRichBolt.
- * This is the core API for emitting tuples. For a simpler API, and a more
- * restricted form of stream processing, see IBasicBolt and
- * BasicOutputCollector.
- */
-public class OutputCollector implements IOutputCollector {
-	private IOutputCollector _delegate;
-
-	public OutputCollector(IOutputCollector delegate) {
-		_delegate = delegate;
-	}
-
-	/**
-	 * Emits a new tuple to a specific stream with a single anchor. The emitted
-	 * values must be immutable.
-	 * 
-	 * @param streamId
-	 *            the stream to emit to
-	 * @param anchor
-	 *            the tuple to anchor to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 * @return the list of task ids that this new tuple was sent to
-	 */
-	public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) {
-		return emit(streamId, Arrays.asList(anchor), tuple);
-	}
-
-	/**
-	 * Emits a new unanchored tuple to the specified stream. Because it's
-	 * unanchored, if a failure happens downstream, this new tuple won't affect
-	 * whether any spout tuples are considered failed or not. The emitted values
-	 * must be immutable.
-	 * 
-	 * @param streamId
-	 *            the stream to emit to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 * @return the list of task ids that this new tuple was sent to
-	 */
-	public List<Integer> emit(String streamId, List<Object> tuple) {
-		return emit(streamId, (List) null, tuple);
-	}
-
-	/**
-	 * Emits a new tuple to the default stream anchored on a group of input
-	 * tuples. The emitted values must be immutable.
-	 * 
-	 * @param anchors
-	 *            the tuples to anchor to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 * @return the list of task ids that this new tuple was sent to
-	 */
-	public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {
-		return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple);
-	}
-
-	/**
-	 * Emits a new tuple to the default stream anchored on a single tuple. The
-	 * emitted values must be immutable.
-	 * 
-	 * @param anchor
-	 *            the tuple to anchor to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 * @return the list of task ids that this new tuple was sent to
-	 */
-	public List<Integer> emit(Tuple anchor, List<Object> tuple) {
-		return emit(Utils.DEFAULT_STREAM_ID, anchor, tuple);
-	}
-
-	/**
-	 * Emits a new unanchored tuple to the default stream. Beacuse it's
-	 * unanchored, if a failure happens downstream, this new tuple won't affect
-	 * whether any spout tuples are considered failed or not. The emitted values
-	 * must be immutable.
-	 * 
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 * @return the list of task ids that this new tuple was sent to
-	 */
-	public List<Integer> emit(List<Object> tuple) {
-		return emit(Utils.DEFAULT_STREAM_ID, tuple);
-	}
-
-	/**
-	 * Emits a tuple directly to the specified task id on the specified stream.
-	 * If the target bolt does not subscribe to this bolt using a direct
-	 * grouping, the tuple will not be sent. If the specified output stream is
-	 * not declared as direct, or the target bolt subscribes with a non-direct
-	 * grouping, an error will occur at runtime. The emitted values must be
-	 * immutable.
-	 * 
-	 * @param taskId
-	 *            the taskId to send the new tuple to
-	 * @param streamId
-	 *            the stream to send the tuple on. It must be declared as a
-	 *            direct stream in the topology definition.
-	 * @param anchor
-	 *            the tuple to anchor to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 */
-	public void emitDirect(int taskId, String streamId, Tuple anchor,
-			List<Object> tuple) {
-		emitDirect(taskId, streamId, Arrays.asList(anchor), tuple);
-	}
-
-	/**
-	 * Emits a tuple directly to the specified task id on the specified stream.
-	 * If the target bolt does not subscribe to this bolt using a direct
-	 * grouping, the tuple will not be sent. If the specified output stream is
-	 * not declared as direct, or the target bolt subscribes with a non-direct
-	 * grouping, an error will occur at runtime. Note that this method does not
-	 * use anchors, so downstream failures won't affect the failure status of
-	 * any spout tuples. The emitted values must be immutable.
-	 * 
-	 * @param taskId
-	 *            the taskId to send the new tuple to
-	 * @param streamId
-	 *            the stream to send the tuple on. It must be declared as a
-	 *            direct stream in the topology definition.
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 */
-	public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-		emitDirect(taskId, streamId, (List) null, tuple);
-	}
-
-	/**
-	 * Emits a tuple directly to the specified task id on the default stream. If
-	 * the target bolt does not subscribe to this bolt using a direct grouping,
-	 * the tuple will not be sent. If the specified output stream is not
-	 * declared as direct, or the target bolt subscribes with a non-direct
-	 * grouping, an error will occur at runtime. The emitted values must be
-	 * immutable.
-	 * 
-	 * <p>
-	 * The default stream must be declared as direct in the topology definition.
-	 * See OutputDeclarer#declare for how this is done when defining topologies
-	 * in Java.
-	 * </p>
-	 * 
-	 * @param taskId
-	 *            the taskId to send the new tuple to
-	 * @param anchosr
-	 *            the tuples to anchor to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 */
-	public void emitDirect(int taskId, Collection<Tuple> anchors,
-			List<Object> tuple) {
-		emitDirect(taskId, Utils.DEFAULT_STREAM_ID, anchors, tuple);
-	}
-
-	/**
-	 * Emits a tuple directly to the specified task id on the default stream. If
-	 * the target bolt does not subscribe to this bolt using a direct grouping,
-	 * the tuple will not be sent. If the specified output stream is not
-	 * declared as direct, or the target bolt subscribes with a non-direct
-	 * grouping, an error will occur at runtime. The emitted values must be
-	 * immutable.
-	 * 
-	 * <p>
-	 * The default stream must be declared as direct in the topology definition.
-	 * See OutputDeclarer#declare for how this is done when defining topologies
-	 * in Java.
-	 * </p>
-	 * 
-	 * @param taskId
-	 *            the taskId to send the new tuple to
-	 * @param anchor
-	 *            the tuple to anchor to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 */
-	public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) {
-		emitDirect(taskId, Utils.DEFAULT_STREAM_ID, anchor, tuple);
-	}
-
-	/**
-	 * Emits a tuple directly to the specified task id on the default stream. If
-	 * the target bolt does not subscribe to this bolt using a direct grouping,
-	 * the tuple will not be sent. If the specified output stream is not
-	 * declared as direct, or the target bolt subscribes with a non-direct
-	 * grouping, an error will occur at runtime. The emitted values must be
-	 * immutable.
-	 * 
-	 * <p>
-	 * The default stream must be declared as direct in the topology definition.
-	 * See OutputDeclarer#declare for how this is done when defining topologies
-	 * in Java.
-	 * </p>
-	 * 
-	 * <p>
-	 * Note that this method does not use anchors, so downstream failures won't
-	 * affect the failure status of any spout tuples.
-	 * </p>
-	 * 
-	 * @param taskId
-	 *            the taskId to send the new tuple to
-	 * @param tuple
-	 *            the new output tuple from this bolt
-	 */
-	public void emitDirect(int taskId, List<Object> tuple) {
-		emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
-	}
-
-	@Override
-	public List<Integer> emit(String streamId, Collection<Tuple> anchors,
-			List<Object> tuple) {
-		return _delegate.emit(streamId, anchors, tuple);
-	}
-
-	@Override
-	public void emitDirect(int taskId, String streamId,
-			Collection<Tuple> anchors, List<Object> tuple) {
-		_delegate.emitDirect(taskId, streamId, anchors, tuple);
-	}
-
-	@Override
-	public void ack(Tuple input) {
-		_delegate.ack(input);
-	}
-
-	@Override
-	public void fail(Tuple input) {
-		_delegate.fail(input);
-	}
-
-	@Override
-	public void reportError(Throwable error) {
-		_delegate.reportError(error);
-	}
-}