You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:06 UTC
[27/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
deleted file mode 100644
index 12d9193..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleDeserializer.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.tuple.MessageId;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImplExt;
-
-import com.esotericsoftware.kryo.io.Input;
-
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-public class KryoTupleDeserializer implements ITupleDeserializer {
- private static final Logger LOG = Logger.getLogger(KryoTupleDeserializer.class);
-
- public static final boolean USE_RAW_PACKET = true;
-
- GeneralTopologyContext _context;
- KryoValuesDeserializer _kryo;
- SerializationFactory.IdDictionary _ids;
- Input _kryoInput;
-
- public KryoTupleDeserializer(final Map conf,
- final GeneralTopologyContext context) {
- _kryo = new KryoValuesDeserializer(conf);
- _context = context;
- _ids = new SerializationFactory.IdDictionary(context.getRawTopology());
- _kryoInput = new Input(1);
- }
-
- public Tuple deserialize(byte[] ser) {
-
- int targetTaskId = 0;
- int taskId = 0;
- int streamId = 0;
- String componentName = null;
- String streamName = null;
- MessageId id = null;
-
- try {
-
- _kryoInput.setBuffer(ser);
-
- targetTaskId = _kryoInput.readInt();
- taskId = _kryoInput.readInt(true);
- streamId = _kryoInput.readInt(true);
- componentName = _context.getComponentId(taskId);
- streamName = _ids.getStreamName(componentName, streamId);
- id = MessageId.deserialize(_kryoInput);
- List<Object> values = _kryo.deserializeFrom(_kryoInput);
- TupleImplExt tuple = new TupleImplExt(_context, values, taskId,
- streamName, id);
- tuple.setTargetTaskId(targetTaskId);
- return tuple;
- } catch (Throwable e) {
- StringBuilder sb = new StringBuilder();
-
- sb.append("Deserialize error:");
- sb.append("targetTaskId:").append(targetTaskId);
- sb.append(",taskId:").append(taskId);
- sb.append(",streamId:").append(streamId);
- sb.append(",componentName:").append(componentName);
- sb.append(",streamName:").append(streamName);
- sb.append(",MessageId").append(id);
-
- LOG.info(sb.toString(), e );
- throw new RuntimeException(e);
- }
- }
-
- /**
- * just get target taskId
- *
- * @param ser
- * @return
- */
- public static int deserializeTaskId(byte[] ser) {
- Input _kryoInput = new Input(1);
-
- _kryoInput.setBuffer(ser);
-
- int targetTaskId = _kryoInput.readInt();
-
- return targetTaskId;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
deleted file mode 100644
index e04b145..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoTupleSerializer.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleExt;
-
-import com.esotericsoftware.kryo.io.Output;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class KryoTupleSerializer implements ITupleSerializer {
- KryoValuesSerializer _kryo;
- SerializationFactory.IdDictionary _ids;
- Output _kryoOut;
-
- public KryoTupleSerializer(final Map conf,
- final GeneralTopologyContext context) {
- _kryo = new KryoValuesSerializer(conf);
- _kryoOut = new Output(2000, 2000000000);
- _ids = new SerializationFactory.IdDictionary(context.getRawTopology());
- }
-
- /**
- * @@@ in the furture, it will skill serialize 'targetTask' through check
- * some flag
- * @see backtype.storm.serialization.ITupleSerializer#serialize(int,
- * backtype.storm.tuple.Tuple)
- */
- public byte[] serialize(Tuple tuple) {
- try {
-
- _kryoOut.clear();
- if (tuple instanceof TupleExt) {
- _kryoOut.writeInt(((TupleExt) tuple).getTargetTaskId());
- }
-
- _kryoOut.writeInt(tuple.getSourceTask(), true);
- _kryoOut.writeInt(
- _ids.getStreamId(tuple.getSourceComponent(),
- tuple.getSourceStreamId()), true);
- tuple.getMessageId().serialize(_kryoOut);
- _kryo.serializeInto(tuple.getValues(), _kryoOut);
- return _kryoOut.toBytes();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static byte[] serialize(int targetTask) {
- ByteBuffer buff = ByteBuffer.allocate((Integer.SIZE / 8));
- buff.putInt(targetTask);
- byte[] rtn = buff.array();
- return rtn;
- }
-
- // public long crc32(Tuple tuple) {
- // try {
- // CRC32OutputStream hasher = new CRC32OutputStream();
- // _kryo.serializeInto(tuple.getValues(), hasher);
- // return hasher.getValue();
- // } catch (IOException e) {
- // throw new RuntimeException(e);
- // }
- // }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
deleted file mode 100644
index c1f3a80..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesDeserializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package backtype.storm.serialization;
-
-import java.io.IOException;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.utils.ListDelegate;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-
-public class KryoValuesDeserializer {
-
- Kryo _kryo;
- Input _kryoInput;
-
- public KryoValuesDeserializer(Map conf) {
- this._kryo = SerializationFactory.getKryo(conf);
- this._kryoInput = new Input(1);
- }
-
- public List<Object> deserializeFrom(Input input) {
- ListDelegate delegate = (ListDelegate) _kryo.readObject(input,
- ListDelegate.class);
- return delegate.getDelegate();
- }
-
- public List<Object> deserialize(byte[] ser) throws IOException {
- _kryoInput.setBuffer(ser);
- return deserializeFrom(_kryoInput);
- }
-
- public Object deserializeObject(byte[] ser) throws IOException {
- _kryoInput.setBuffer(ser);
- return _kryo.readClassAndObject(_kryoInput);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
deleted file mode 100644
index 6072282..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/KryoValuesSerializer.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.utils.ListDelegate;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Output;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-public class KryoValuesSerializer {
- Kryo _kryo;
- ListDelegate _delegate;
- Output _kryoOut;
-
- public KryoValuesSerializer(Map conf) {
- _kryo = SerializationFactory.getKryo(conf);
- _delegate = new ListDelegate();
- _kryoOut = new Output(2000, 2000000000);
- }
-
- public void serializeInto(List<Object> values, Output out)
- throws IOException {
- // this ensures that list of values is always written the same way,
- // regardless
- // of whether it's a java collection or one of clojure's persistent
- // collections
- // (which have different serializers)
- // Doing this lets us deserialize as ArrayList and avoid writing the
- // class here
- _delegate.setDelegate(values);
- _kryo.writeObject(out, _delegate);
- }
-
- public byte[] serialize(List<Object> values) throws IOException {
- _kryoOut.clear();
- serializeInto(values, _kryoOut);
- return _kryoOut.toBytes();
- }
-
- public byte[] serializeObject(Object obj) {
- _kryoOut.clear();
- _kryo.writeClassAndObject(_kryoOut, obj);
- return _kryoOut.toBytes();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java
deleted file mode 100644
index 4fcaf02..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/SerializableSerializer.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.serialization;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.commons.io.input.ClassLoaderObjectInputStream;
-
-public class SerializableSerializer extends Serializer<Object> {
-
- @Override
- public void write(Kryo kryo, Output output, Object object) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- try {
- ObjectOutputStream oos = new ObjectOutputStream(bos);
- oos.writeObject(object);
- oos.flush();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- byte[] ser = bos.toByteArray();
- output.writeInt(ser.length);
- output.writeBytes(ser);
- }
-
- @Override
- public Object read(Kryo kryo, Input input, Class c) {
- int len = input.readInt();
- byte[] ser = new byte[len];
- input.readBytes(ser);
- ByteArrayInputStream bis = new ByteArrayInputStream(ser);
- try {
- ClassLoaderObjectInputStream ois = new ClassLoaderObjectInputStream(
- kryo.getClassLoader(), bis);
- return ois.readObject();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java b/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java
deleted file mode 100644
index 88f7803..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/SerializationFactory.java
+++ /dev/null
@@ -1,242 +0,0 @@
-package backtype.storm.serialization;
-
-import backtype.storm.Config;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.serialization.types.ArrayListSerializer;
-import backtype.storm.serialization.types.ListDelegateSerializer;
-import backtype.storm.serialization.types.HashMapSerializer;
-import backtype.storm.serialization.types.HashSetSerializer;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.ListDelegate;
-import backtype.storm.utils.Utils;
-import backtype.storm.utils.WorkerClassLoader;
-import carbonite.JavaBridge;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.serializers.DefaultSerializers.BigIntegerSerializer;
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SerializationFactory {
- public static final Logger LOG = LoggerFactory
- .getLogger(SerializationFactory.class);
-
- public static Kryo getKryo(Map conf) {
- IKryoFactory kryoFactory = (IKryoFactory) Utils
- .newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
- Kryo k = kryoFactory.getKryo(conf);
- if (WorkerClassLoader.getInstance() != null)
- k.setClassLoader(WorkerClassLoader.getInstance());
- k.register(byte[].class);
-
- /* tuple payload serializer is specified via configuration */
- String payloadSerializerName = (String) conf
- .get(Config.TOPOLOGY_TUPLE_SERIALIZER);
- try {
- Class serializerClass = Class.forName(
- payloadSerializerName, true, k.getClassLoader());
- Serializer serializer = resolveSerializerInstance(k,
- ListDelegate.class, serializerClass, conf);
- k.register(ListDelegate.class, serializer);
- } catch (ClassNotFoundException ex) {
- throw new RuntimeException(ex);
- }
-
- k.register(ArrayList.class, new ArrayListSerializer());
- k.register(HashMap.class, new HashMapSerializer());
- k.register(HashSet.class, new HashSetSerializer());
- k.register(BigInteger.class, new BigIntegerSerializer());
- k.register(TransactionAttempt.class);
- k.register(Values.class);
- k.register(backtype.storm.metric.api.IMetricsConsumer.DataPoint.class);
- k.register(backtype.storm.metric.api.IMetricsConsumer.TaskInfo.class);
- try {
- JavaBridge.registerPrimitives(k);
- JavaBridge.registerCollections(k);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- Map<String, String> registrations = normalizeKryoRegister(conf);
-
- kryoFactory.preRegister(k, conf);
-
- boolean skipMissing = (Boolean) conf
- .get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);
- for (String klassName : registrations.keySet()) {
- String serializerClassName = registrations.get(klassName);
- try {
- Class klass = Class.forName(
- klassName, true, k.getClassLoader());
-
- Class serializerClass = null;
- if (serializerClassName != null)
- serializerClass = Class.forName(
- serializerClassName, true, k.getClassLoader());
- if (serializerClass == null) {
- k.register(klass);
- } else {
- k.register(
- klass,
- resolveSerializerInstance(k, klass,
- serializerClass, conf));
- }
- } catch (ClassNotFoundException e) {
- if (skipMissing) {
- LOG.info("Could not find serialization or class for "
- + serializerClassName
- + ". Skipping registration...");
- } else {
- throw new RuntimeException(e);
- }
- }
- }
-
- kryoFactory.postRegister(k, conf);
-
- if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
- for (String klassName : (List<String>) conf
- .get(Config.TOPOLOGY_KRYO_DECORATORS)) {
- try {
- Class klass = Class.forName(
- klassName, true, k.getClassLoader());
- IKryoDecorator decorator = (IKryoDecorator) klass
- .newInstance();
- decorator.decorate(k);
- } catch (ClassNotFoundException e) {
- if (skipMissing) {
- LOG.info("Could not find kryo decorator named "
- + klassName + ". Skipping registration...");
- } else {
- throw new RuntimeException(e);
- }
- } catch (InstantiationException e) {
- throw new RuntimeException(e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- kryoFactory.postDecorate(k, conf);
-
- return k;
- }
-
- public static class IdDictionary {
- Map<String, Map<String, Integer>> streamNametoId = new HashMap<String, Map<String, Integer>>();
- Map<String, Map<Integer, String>> streamIdToName = new HashMap<String, Map<Integer, String>>();
-
- public IdDictionary(StormTopology topology) {
- List<String> componentNames = new ArrayList<String>(topology
- .get_spouts().keySet());
- componentNames.addAll(topology.get_bolts().keySet());
- componentNames.addAll(topology.get_state_spouts().keySet());
-
- for (String name : componentNames) {
- ComponentCommon common = Utils.getComponentCommon(topology,
- name);
- List<String> streams = new ArrayList<String>(common
- .get_streams().keySet());
- streamNametoId.put(name, idify(streams));
- streamIdToName.put(name,
- Utils.reverseMap(streamNametoId.get(name)));
- }
- }
-
- public int getStreamId(String component, String stream) {
- return streamNametoId.get(component).get(stream);
- }
-
- public String getStreamName(String component, int stream) {
- return streamIdToName.get(component).get(stream);
- }
-
- private static Map<String, Integer> idify(List<String> names) {
- Collections.sort(names);
- Map<String, Integer> ret = new HashMap<String, Integer>();
- int i = 1;
- for (String name : names) {
- ret.put(name, i);
- i++;
- }
- return ret;
- }
- }
-
- private static Serializer resolveSerializerInstance(Kryo k,
- Class superClass, Class<? extends Serializer> serializerClass,
- Map conf) {
- try {
- try {
- return serializerClass.getConstructor(Kryo.class, Class.class,
- Map.class).newInstance(k, superClass, conf);
- } catch (Exception ex1) {
- try {
- return serializerClass.getConstructor(Kryo.class,
- Class.class).newInstance(k, superClass);
- } catch (Exception ex2) {
- try {
- return serializerClass.getConstructor(Kryo.class,
- Map.class).newInstance(k, conf);
- } catch (Exception ex3) {
- try {
- return serializerClass.getConstructor(Kryo.class)
- .newInstance(k);
- } catch (Exception ex4) {
- try {
- return serializerClass.getConstructor(
- Class.class, Map.class).newInstance(
- superClass, conf);
- } catch (Exception ex5) {
- try {
- return serializerClass.getConstructor(
- Class.class)
- .newInstance(superClass);
- } catch (Exception ex6) {
- return serializerClass.newInstance();
- }
- }
- }
- }
- }
- }
- } catch (Exception ex) {
- throw new IllegalArgumentException("Unable to create serializer \""
- + serializerClass.getName() + "\" for class: "
- + superClass.getName(), ex);
- }
- }
-
- private static Map<String, String> normalizeKryoRegister(Map conf) {
- // TODO: de-duplicate this logic with the code in nimbus
- Object res = conf.get(Config.TOPOLOGY_KRYO_REGISTER);
- if (res == null)
- return new TreeMap<String, String>();
- Map<String, String> ret = new HashMap<String, String>();
- if (res instanceof Map) {
- ret = (Map<String, String>) res;
- } else {
- for (Object o : (List) res) {
- if (o instanceof Map) {
- ret.putAll((Map) o);
- } else {
- ret.put((String) o, null);
- }
- }
- }
-
- // ensure always same order for registrations with TreeMap
- return new TreeMap<String, String>(ret);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
deleted file mode 100644
index e403a95..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/ArrayListSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import java.util.ArrayList;
-import java.util.Collection;
-
-public class ArrayListSerializer extends CollectionSerializer {
- @Override
- public Collection create(Kryo kryo, Input input, Class<Collection> type) {
- return new ArrayList();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
deleted file mode 100644
index c1f7456..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashMapSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.MapSerializer;
-import java.util.HashMap;
-import java.util.Map;
-
-public class HashMapSerializer extends MapSerializer {
- @Override
- public Map create(Kryo kryo, Input input, Class<Map> type) {
- return new HashMap();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
deleted file mode 100644
index b28bbd6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/HashSetSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import java.util.Collection;
-import java.util.HashSet;
-
-public class HashSetSerializer extends CollectionSerializer {
- @Override
- public Collection create(Kryo kryo, Input input, Class<Collection> type) {
- return new HashSet();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java b/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
deleted file mode 100644
index 67242a2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/serialization/types/ListDelegateSerializer.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.serialization.types;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.serializers.CollectionSerializer;
-import backtype.storm.utils.ListDelegate;
-import java.util.Collection;
-
-public class ListDelegateSerializer extends CollectionSerializer {
- @Override
- public Collection create(Kryo kryo, Input input, Class<Collection> type) {
- return new ListDelegate();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java b/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
deleted file mode 100644
index ba31324..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/IMultiSchemableSpout.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.spout;
-
-public interface IMultiSchemableSpout {
- MultiScheme getScheme();
-
- void setScheme(MultiScheme scheme);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java b/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java
deleted file mode 100644
index 5bbc869..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISchemableSpout.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.spout;
-
-public interface ISchemableSpout {
- Scheme getScheme();
-
- void setScheme(Scheme scheme);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java b/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java
deleted file mode 100644
index 67f94f3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISpout.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-import java.io.Serializable;
-
-/**
- * ISpout is the core interface for implementing spouts. A Spout is responsible
- * for feeding messages into the topology for processing. For every tuple
- * emitted by a spout, Storm will track the (potentially very large) DAG of
- * tuples generated based on a tuple emitted by the spout. When Storm detects
- * that every tuple in that DAG has been successfully processed, it will send an
- * ack message to the Spout.
- *
- * <p>
- * If a tuple fails to be fully process within the configured timeout for the
- * topology (see {@link backtype.storm.Config}), Storm will send a fail message
- * to the spout for the message.
- * </p>
- *
- * <p>
- * When a Spout emits a tuple, it can tag the tuple with a message id. The
- * message id can be any type. When Storm acks or fails a message, it will pass
- * back to the spout the same message id to identify which tuple it's referring
- * to. If the spout leaves out the message id, or sets it to null, then Storm
- * will not track the message and the spout will not receive any ack or fail
- * callbacks for the message.
- * </p>
- *
- * <p>
- * Storm executes ack, fail, and nextTuple all on the same thread. This means
- * that an implementor of an ISpout does not need to worry about concurrency
- * issues between those methods. However, it also means that an implementor must
- * ensure that nextTuple is non-blocking: otherwise the method could block acks
- * and fails that are pending to be processed.
- * </p>
- */
-public interface ISpout extends Serializable {
- /**
- * Called when a task for this component is initialized within a worker on
- * the cluster. It provides the spout with the environment in which the
- * spout executes.
- *
- * <p>
- * This includes the:
- * </p>
- *
- * @param conf
- * The Storm configuration for this spout. This is the
- * configuration provided to the topology merged in with cluster
- * configuration on this machine.
- * @param context
- * This object can be used to get information about this task's
- * place within the topology, including the task id and component
- * id of this task, input and output information, etc.
- * @param collector
- * The collector is used to emit tuples from this spout. Tuples
- * can be emitted at any time, including the open and close
- * methods. The collector is thread-safe and should be saved as
- * an instance variable of this spout object.
- */
- void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
-
- /**
- * Called when an ISpout is going to be shutdown. There is no guarentee that
- * close will be called, because the supervisor kill -9's worker processes
- * on the cluster.
- *
- * <p>
- * The one context where close is guaranteed to be called is a topology is
- * killed when running Storm in local mode.
- * </p>
- */
- void close();
-
- /**
- * Called when a spout has been activated out of a deactivated mode.
- * nextTuple will be called on this spout soon. A spout can become activated
- * after having been deactivated when the topology is manipulated using the
- * `storm` client.
- */
- void activate();
-
- /**
- * Called when a spout has been deactivated. nextTuple will not be called
- * while a spout is deactivated. The spout may or may not be reactivated in
- * the future.
- */
- void deactivate();
-
- /**
- * When this method is called, Storm is requesting that the Spout emit
- * tuples to the output collector. This method should be non-blocking, so if
- * the Spout has no tuples to emit, this method should return. nextTuple,
- * ack, and fail are all called in a tight loop in a single thread in the
- * spout task. When there are no tuples to emit, it is courteous to have
- * nextTuple sleep for a short amount of time (like a single millisecond) so
- * as not to waste too much CPU.
- */
- void nextTuple();
-
- /**
- * Storm has determined that the tuple emitted by this spout with the msgId
- * identifier has been fully processed. Typically, an implementation of this
- * method will take that message off the queue and prevent it from being
- * replayed.
- */
- void ack(Object msgId);
-
- /**
- * The tuple emitted by this spout with the msgId identifier has failed to
- * be fully processed. Typically, an implementation of this method will put
- * that message back on the queue to be replayed at a later time.
- */
- void fail(Object msgId);
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java
deleted file mode 100644
index 6b66b00..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutOutputCollector.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.List;
-
-public interface ISpoutOutputCollector {
- /**
- * Returns the task ids that received the tuples.
- */
- List<Integer> emit(String streamId, List<Object> tuple, Object messageId);
-
- void emitDirect(int taskId, String streamId, List<Object> tuple,
- Object messageId);
-
- void reportError(Throwable error);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java b/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java
deleted file mode 100644
index f5a25b3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ISpoutWaitStrategy.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.Map;
-
-/**
- * The strategy a spout needs to use when its waiting. Waiting is triggered in
- * one of two conditions:
- *
- * 1. nextTuple emits no tuples 2. The spout has hit maxSpoutPending and can't
- * emit any more tuples
- *
- * The default strategy sleeps for one millisecond.
- */
-public interface ISpoutWaitStrategy {
- void prepare(Map conf);
-
- void emptyEmit(long streak);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java
deleted file mode 100644
index e67d036..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/MultiScheme.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.List;
-import java.io.Serializable;
-
-import backtype.storm.tuple.Fields;
-
-public interface MultiScheme extends Serializable {
- public Iterable<List<Object>> deserialize(byte[] ser);
-
- public Fields getOutputFields();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java b/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java
deleted file mode 100644
index c084b10..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/NothingEmptyEmitStrategy.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.Map;
-
-public class NothingEmptyEmitStrategy implements ISpoutWaitStrategy {
- @Override
- public void emptyEmit(long streak) {
- }
-
- @Override
- public void prepare(Map conf) {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java
deleted file mode 100644
index 2446b45..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/RawMultiScheme.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.List;
-
-import backtype.storm.tuple.Fields;
-
-import static backtype.storm.utils.Utils.tuple;
-import static java.util.Arrays.asList;
-
-public class RawMultiScheme implements MultiScheme {
- @Override
- public Iterable<List<Object>> deserialize(byte[] ser) {
- return asList(tuple(ser));
- }
-
- @Override
- public Fields getOutputFields() {
- return new Fields("bytes");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java
deleted file mode 100644
index 46e9d1c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/RawScheme.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.tuple.Fields;
-import java.util.List;
-import static backtype.storm.utils.Utils.tuple;
-
-public class RawScheme implements Scheme {
- public List<Object> deserialize(byte[] ser) {
- return tuple(ser);
- }
-
- public Fields getOutputFields() {
- return new Fields("bytes");
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java b/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java
deleted file mode 100644
index 26bf3ae..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/Scheme.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.tuple.Fields;
-import java.io.Serializable;
-import java.util.List;
-
-public interface Scheme extends Serializable {
- public List<Object> deserialize(byte[] ser);
-
- public Fields getOutputFields();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java b/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java
deleted file mode 100644
index cc80ef9..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/SchemeAsMultiScheme.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.spout;
-
-import java.util.Arrays;
-import java.util.List;
-
-import backtype.storm.tuple.Fields;
-
-public class SchemeAsMultiScheme implements MultiScheme {
- public final Scheme scheme;
-
- public SchemeAsMultiScheme(Scheme scheme) {
- this.scheme = scheme;
- }
-
- @Override
- public Iterable<List<Object>> deserialize(final byte[] ser) {
- List<Object> o = scheme.deserialize(ser);
- if (o == null)
- return null;
- else
- return Arrays.asList(o);
- }
-
- @Override
- public Fields getOutputFields() {
- return scheme.getOutputFields();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java b/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java
deleted file mode 100644
index a8e18dc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/ShellSpout.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.spout;
-
-import backtype.storm.Config;
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.rpc.IShellMetric;
-import backtype.storm.multilang.ShellMsg;
-import backtype.storm.multilang.SpoutMsg;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.ShellProcess;
-import java.util.Map;
-import java.util.List;
-import java.util.TimerTask;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import clojure.lang.RT;
-import com.google.common.util.concurrent.MoreExecutors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ShellSpout implements ISpout {
- public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
-
- private SpoutOutputCollector _collector;
- private String[] _command;
- private ShellProcess _process;
-
- private TopologyContext _context;
-
- private SpoutMsg _spoutMsg;
-
- private int workerTimeoutMills;
- private ScheduledExecutorService heartBeatExecutorService;
- private AtomicLong lastHeartbeatTimestamp = new AtomicLong();
-
- public ShellSpout(ShellComponent component) {
- this(component.get_execution_command(), component.get_script());
- }
-
- public ShellSpout(String... command) {
- _command = command;
- }
-
- public void open(Map stormConf, TopologyContext context,
- SpoutOutputCollector collector) {
- _collector = collector;
- _context = context;
-
- workerTimeoutMills = 1000 * RT.intCast(stormConf.get(Config.SUPERVISOR_WORKER_TIMEOUT_SECS));
-
- _process = new ShellProcess(_command);
-
- Number subpid = _process.launch(stormConf, context);
- LOG.info("Launched subprocess with pid " + subpid);
-
- heartBeatExecutorService = MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
- }
-
- public void close() {
- heartBeatExecutorService.shutdownNow();
- _process.destroy();
- }
-
- public void nextTuple() {
- if (_spoutMsg == null) {
- _spoutMsg = new SpoutMsg();
- }
- _spoutMsg.setCommand("next");
- _spoutMsg.setId("");
- querySubprocess();
- }
-
- public void ack(Object msgId) {
- if (_spoutMsg == null) {
- _spoutMsg = new SpoutMsg();
- }
- _spoutMsg.setCommand("ack");
- _spoutMsg.setId(msgId);
- querySubprocess();
- }
-
- public void fail(Object msgId) {
- if (_spoutMsg == null) {
- _spoutMsg = new SpoutMsg();
- }
- _spoutMsg.setCommand("fail");
- _spoutMsg.setId(msgId);
- querySubprocess();
- }
-
- private void handleMetrics(ShellMsg shellMsg) {
- //get metric name
- String name = shellMsg.getMetricName();
- if (name.isEmpty()) {
- throw new RuntimeException("Receive Metrics name is empty");
- }
-
- //get metric by name
- IMetric iMetric = _context.getRegisteredMetricByName(name);
- if (iMetric == null) {
- throw new RuntimeException("Could not find metric by name["+name+"] ");
- }
- if ( !(iMetric instanceof IShellMetric)) {
- throw new RuntimeException("Metric["+name+"] is not IShellMetric, can not call by RPC");
- }
- IShellMetric iShellMetric = (IShellMetric)iMetric;
-
- //call updateMetricFromRPC with params
- Object paramsObj = shellMsg.getMetricParams();
- try {
- iShellMetric.updateMetricFromRPC(paramsObj);
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- private void querySubprocess() {
- try {
- _process.writeSpoutMsg(_spoutMsg);
-
- while (true) {
- ShellMsg shellMsg = _process.readShellMsg();
- String command = shellMsg.getCommand();
- if (command == null) {
- throw new IllegalArgumentException("Command not found in spout message: " + shellMsg);
- }
-
- setHeartbeat();
-
- if (command.equals("sync")) {
- return;
- } else if (command.equals("log")) {
- handleLog(shellMsg);
- } else if (command.equals("emit")) {
- String stream = shellMsg.getStream();
- Long task = shellMsg.getTask();
- List<Object> tuple = shellMsg.getTuple();
- Object messageId = shellMsg.getId();
- if (task == 0) {
- List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
- if (shellMsg.areTaskIdsNeeded()) {
- _process.writeTaskIds(outtasks);
- }
- } else {
- _collector.emitDirect((int) task.longValue(), stream, tuple, messageId);
- }
- } else if (command.equals("metrics")) {
- handleMetrics(shellMsg);
- } else {
- throw new RuntimeException("Unknown command received: " + command);
- }
- }
- } catch (Exception e) {
- String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
- throw new RuntimeException(processInfo, e);
- }
- }
-
- private void handleLog(ShellMsg shellMsg) {
- String msg = shellMsg.getMsg();
- msg = "ShellLog " + _process.getProcessInfoString() + " " + msg;
- ShellMsg.ShellLogLevel logLevel = shellMsg.getLogLevel();
-
- switch (logLevel) {
- case TRACE:
- LOG.trace(msg);
- break;
- case DEBUG:
- LOG.debug(msg);
- break;
- case INFO:
- LOG.info(msg);
- break;
- case WARN:
- LOG.warn(msg);
- break;
- case ERROR:
- LOG.error(msg);
- break;
- default:
- LOG.info(msg);
- break;
- }
- }
-
- @Override
- public void activate() {
- LOG.info("Start checking heartbeat...");
- // prevent timer to check heartbeat based on last thing before activate
- setHeartbeat();
- heartBeatExecutorService.scheduleAtFixedRate(new SpoutHeartbeatTimerTask(this), 1, 1, TimeUnit.SECONDS);
- }
-
- @Override
- public void deactivate() {
- heartBeatExecutorService.shutdownNow();
- }
-
- private void setHeartbeat() {
- lastHeartbeatTimestamp.set(System.currentTimeMillis());
- }
-
- private long getLastHeartbeat() {
- return lastHeartbeatTimestamp.get();
- }
-
- private void die(Throwable exception) {
- heartBeatExecutorService.shutdownNow();
-
- LOG.error("Halting process: ShellSpout died.", exception);
- _collector.reportError(exception);
- _process.destroy();
- System.exit(11);
- }
-
- private class SpoutHeartbeatTimerTask extends TimerTask {
- private ShellSpout spout;
-
- public SpoutHeartbeatTimerTask(ShellSpout spout) {
- this.spout = spout;
- }
-
- @Override
- public void run() {
- long currentTimeMillis = System.currentTimeMillis();
- long lastHeartbeat = getLastHeartbeat();
-
- LOG.debug("current time : {}, last heartbeat : {}, worker timeout (ms) : {}",
- currentTimeMillis, lastHeartbeat, workerTimeoutMills);
-
- if (currentTimeMillis - lastHeartbeat > workerTimeoutMills) {
- spout.die(new RuntimeException("subprocess heartbeat timeout"));
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java b/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java
deleted file mode 100644
index 0aa7f64..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/SleepSpoutWaitStrategy.java
+++ /dev/null
@@ -1,25 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.Config;
-import java.util.Map;
-
-public class SleepSpoutWaitStrategy implements ISpoutWaitStrategy {
-
- long sleepMillis;
-
- @Override
- public void prepare(Map conf) {
- sleepMillis = ((Number) conf
- .get(Config.TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS))
- .longValue();
- }
-
- @Override
- public void emptyEmit(long streak) {
- try {
- Thread.sleep(sleepMillis);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java
deleted file mode 100644
index 069fb99..0000000
--- a/jstorm-client/src/main/java/backtype/storm/spout/SpoutOutputCollector.java
+++ /dev/null
@@ -1,125 +0,0 @@
-package backtype.storm.spout;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-/**
- * This output collector exposes the API for emitting tuples from an
- * {@link backtype.storm.topology.IRichSpout}. The main difference between this
- * output collector and {@link OutputCollector} for
- * {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages
- * with ids so that they can be acked or failed later on. This is the Spout
- * portion of Storm's API to guarantee that each message is fully processed at
- * least once.
- */
-public class SpoutOutputCollector implements ISpoutOutputCollector {
- ISpoutOutputCollector _delegate;
-
- public SpoutOutputCollector(ISpoutOutputCollector delegate) {
- _delegate = delegate;
- }
-
- /**
- * Emits a new tuple to the specified output stream with the given message
- * ID. When Storm detects that this tuple has been fully processed, or has
- * failed to be fully processed, the spout will receive an ack or fail
- * callback respectively with the messageId as long as the messageId was not
- * null. If the messageId was null, Storm will not track the tuple and no
- * callback will be received. The emitted values must be immutable.
- *
- * @return the list of task ids that this tuple was sent to
- */
- public List<Integer> emit(String streamId, List<Object> tuple,
- Object messageId) {
- return _delegate.emit(streamId, tuple, messageId);
- }
-
- /**
- * Emits a new tuple to the default output stream with the given message ID.
- * When Storm detects that this tuple has been fully processed, or has
- * failed to be fully processed, the spout will receive an ack or fail
- * callback respectively with the messageId as long as the messageId was not
- * null. If the messageId was null, Storm will not track the tuple and no
- * callback will be received. The emitted values must be immutable.
- *
- * @return the list of task ids that this tuple was sent to
- */
- public List<Integer> emit(List<Object> tuple, Object messageId) {
- return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId);
- }
-
- /**
- * Emits a tuple to the default output stream with a null message id. Storm
- * will not track this message so ack and fail will never be called for this
- * tuple. The emitted values must be immutable.
- */
- public List<Integer> emit(List<Object> tuple) {
- return emit(tuple, null);
- }
-
- /**
- * Emits a tuple to the specified output stream with a null message id.
- * Storm will not track this message so ack and fail will never be called
- * for this tuple. The emitted values must be immutable.
- */
- public List<Integer> emit(String streamId, List<Object> tuple) {
- return emit(streamId, tuple, null);
- }
-
- /**
- * Emits a tuple to the specified task on the specified output stream. This
- * output stream must have been declared as a direct stream, and the
- * specified task must use a direct grouping on this stream to receive the
- * message. The emitted values must be immutable.
- */
- public void emitDirect(int taskId, String streamId, List<Object> tuple,
- Object messageId) {
- _delegate.emitDirect(taskId, streamId, tuple, messageId);
- }
-
- /**
- * Emits a tuple to the specified task on the default output stream. This
- * output stream must have been declared as a direct stream, and the
- * specified task must use a direct grouping on this stream to receive the
- * message. The emitted values must be immutable.
- */
- public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
- }
-
- /**
- * Emits a tuple to the specified task on the specified output stream. This
- * output stream must have been declared as a direct stream, and the
- * specified task must use a direct grouping on this stream to receive the
- * message. The emitted values must be immutable.
- *
- * <p>
- * Because no message id is specified, Storm will not track this message so
- * ack and fail will never be called for this tuple.
- * </p>
- */
- public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- emitDirect(taskId, streamId, tuple, null);
- }
-
- /**
- * Emits a tuple to the specified task on the default output stream. This
- * output stream must have been declared as a direct stream, and the
- * specified task must use a direct grouping on this stream to receive the
- * message. The emitted values must be immutable.
- *
- * <p>
- * Because no message id is specified, Storm will not track this message so
- * ack and fail will never be called for this tuple.
- * </p>
- */
- public void emitDirect(int taskId, List<Object> tuple) {
- emitDirect(taskId, tuple, null);
- }
-
- @Override
- public void reportError(Throwable error) {
- _delegate.reportError(error);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java b/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java
deleted file mode 100644
index 2c8b300..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/IStateSpout.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.state;
-
-import backtype.storm.task.TopologyContext;
-import java.io.Serializable;
-import java.util.Map;
-
-public interface IStateSpout extends Serializable {
- void open(Map conf, TopologyContext context);
-
- void close();
-
- void nextTuple(StateSpoutOutputCollector collector);
-
- void synchronize(SynchronizeOutputCollector collector);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java
deleted file mode 100644
index d26ed6b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/IStateSpoutOutputCollector.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.state;
-
-public interface IStateSpoutOutputCollector extends ISynchronizeOutputCollector {
- void remove(int streamId, Object id);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java b/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java
deleted file mode 100644
index 4256a0a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/ISubscribedState.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.state;
-
-import backtype.storm.tuple.Tuple;
-
-public interface ISubscribedState {
- void set(Object id, Tuple tuple);
-
- void remove(Object id);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java
deleted file mode 100644
index 97a8a8e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/ISynchronizeOutputCollector.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.state;
-
-import java.util.List;
-
-public interface ISynchronizeOutputCollector {
- void add(int streamId, Object id, List<Object> tuple);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java
deleted file mode 100644
index 3156e95..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/StateSpoutOutputCollector.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.state;
-
-public class StateSpoutOutputCollector extends SynchronizeOutputCollector
- implements IStateSpoutOutputCollector {
-
- @Override
- public void remove(int streamId, Object id) {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java
deleted file mode 100644
index 9474fa2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/state/SynchronizeOutputCollector.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.state;
-
-import java.util.List;
-
-public class SynchronizeOutputCollector implements ISynchronizeOutputCollector {
-
- @Override
- public void add(int streamId, Object id, List<Object> tuple) {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java b/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java
deleted file mode 100644
index 4817fb4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/GeneralTopologyContext.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package backtype.storm.task;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.json.simple.JSONAware;
-
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.ThriftTopologyUtils;
-import backtype.storm.utils.Utils;
-
-public class GeneralTopologyContext implements JSONAware {
- private StormTopology _topology;
- private Map<Integer, String> _taskToComponent;
- private Map<String, List<Integer>> _componentToTasks;
- private Map<String, Map<String, Fields>> _componentToStreamToFields;
- private String _topologyId;
- protected Map _stormConf;
-
- // pass in componentToSortedTasks for the case of running tons of tasks in
- // single executor
- public GeneralTopologyContext(StormTopology topology, Map stormConf,
- Map<Integer, String> taskToComponent,
- Map<String, List<Integer>> componentToSortedTasks,
- Map<String, Map<String, Fields>> componentToStreamToFields,
- String topologyId) {
- _topology = topology;
- _stormConf = stormConf;
- _taskToComponent = taskToComponent;
- _topologyId = topologyId;
- _componentToTasks = componentToSortedTasks;
- _componentToStreamToFields = componentToStreamToFields;
- }
-
- /**
- * Gets the unique id assigned to this topology. The id is the storm name
- * with a unique nonce appended to it.
- *
- * @return the topology id
- */
- public String getTopologyId() {
- return _topologyId;
- }
-
- /**
- * Please use the getTopologId() instead.
- *
- * @return the topology id
- */
- @Deprecated
- public String getStormId() {
- return _topologyId;
- }
-
- /**
- * Gets the Thrift object representing the topology.
- *
- * @return the Thrift definition representing the topology
- */
- public StormTopology getRawTopology() {
- return _topology;
- }
-
- /**
- * Gets the component id for the specified task id. The component id maps to
- * a component id specified for a Spout or Bolt in the topology definition.
- *
- * @param taskId
- * the task id
- * @return the component id for the input task id
- */
- public String getComponentId(int taskId) {
- if (taskId == Constants.SYSTEM_TASK_ID) {
- return Constants.SYSTEM_COMPONENT_ID;
- } else {
- return _taskToComponent.get(taskId);
- }
- }
-
- /**
- * Gets the set of streams declared for the specified component.
- */
- public Set<String> getComponentStreams(String componentId) {
- return getComponentCommon(componentId).get_streams().keySet();
- }
-
- /**
- * Gets the task ids allocated for the given component id. The task ids are
- * always returned in ascending order.
- */
- public List<Integer> getComponentTasks(String componentId) {
- List<Integer> ret = _componentToTasks.get(componentId);
- if (ret == null)
- return new ArrayList<Integer>();
- else
- return new ArrayList<Integer>(ret);
- }
-
- /**
- * Gets the declared output fields for the specified component/stream.
- */
- public Fields getComponentOutputFields(String componentId, String streamId) {
- Fields ret = _componentToStreamToFields.get(componentId).get(streamId);
- if (ret == null) {
- throw new IllegalArgumentException(
- "No output fields defined for component:stream "
- + componentId + ":" + streamId);
- }
- return ret;
- }
-
- /**
- * Gets the declared output fields for the specified global stream id.
- */
- public Fields getComponentOutputFields(GlobalStreamId id) {
- return getComponentOutputFields(id.get_componentId(), id.get_streamId());
- }
-
- /**
- * Gets the declared inputs to the specified component.
- *
- * @return A map from subscribed component/stream to the grouping subscribed
- * with.
- */
- public Map<GlobalStreamId, Grouping> getSources(String componentId) {
- return getComponentCommon(componentId).get_inputs();
- }
-
- /**
- * Gets information about who is consuming the outputs of the specified
- * component, and how.
- *
- * @return Map from stream id to component id to the Grouping used.
- */
- public Map<String, Map<String, Grouping>> getTargets(String componentId) {
- Map<String, Map<String, Grouping>> ret = new HashMap<String, Map<String, Grouping>>();
- for (String otherComponentId : getComponentIds()) {
- Map<GlobalStreamId, Grouping> inputs = getComponentCommon(
- otherComponentId).get_inputs();
- for (GlobalStreamId id : inputs.keySet()) {
- if (id.get_componentId().equals(componentId)) {
- Map<String, Grouping> curr = ret.get(id.get_streamId());
- if (curr == null)
- curr = new HashMap<String, Grouping>();
- curr.put(otherComponentId, inputs.get(id));
- ret.put(id.get_streamId(), curr);
- }
- }
- }
- return ret;
- }
-
- @Override
- public String toJSONString() {
- Map obj = new HashMap();
- obj.put("task->component", _taskToComponent);
- // TODO: jsonify StormTopology
- // at the minimum should send source info
- return Utils.to_json(obj);
- }
-
- /**
- * Gets a map from task id to component id.
- */
- public Map<Integer, String> getTaskToComponent() {
- return _taskToComponent;
- }
-
- /**
- * Gets a list of all component ids in this topology
- */
- public Set<String> getComponentIds() {
- return ThriftTopologyUtils.getComponentIds(getRawTopology());
- }
-
- public ComponentCommon getComponentCommon(String componentId) {
- return ThriftTopologyUtils.getComponentCommon(getRawTopology(),
- componentId);
- }
-
- public int maxTopologyMessageTimeout() {
- Integer max = Utils.getInt(_stormConf
- .get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
- for (String spout : getRawTopology().get_spouts().keySet()) {
- ComponentCommon common = getComponentCommon(spout);
- String jsonConf = common.get_json_conf();
- if (jsonConf != null) {
- Map conf = (Map) Utils.from_json(jsonConf);
- Object comp = conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
- if (comp != null) {
- max = Math.max(Utils.getInt(comp), max);
- }
- }
- }
- return max;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IBolt.java b/jstorm-client/src/main/java/backtype/storm/task/IBolt.java
deleted file mode 100644
index bfffa14..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IBolt.java
+++ /dev/null
@@ -1,98 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-import java.io.Serializable;
-
-/**
- * An IBolt represents a component that takes tuples as input and produces
- * tuples as output. An IBolt can do everything from filtering to joining to
- * functions to aggregations. It does not have to process a tuple immediately
- * and may hold onto tuples to process later.
- *
- * <p>
- * A bolt's lifecycle is as follows:
- * </p>
- *
- * <p>
- * IBolt object created on client machine. The IBolt is serialized into the
- * topology (using Java serialization) and submitted to the master machine of
- * the cluster (Nimbus). Nimbus then launches workers which deserialize the
- * object, call prepare on it, and then start processing tuples.
- * </p>
- *
- * <p>
- * If you want to parameterize an IBolt, you should set the parameter's through
- * its constructor and save the parameterization state as instance variables
- * (which will then get serialized and shipped to every task executing this bolt
- * across the cluster).
- * </p>
- *
- * <p>
- * When defining bolts in Java, you should use the IRichBolt interface which
- * adds necessary methods for using the Java TopologyBuilder API.
- * </p>
- */
-public interface IBolt extends Serializable {
- /**
- * Called when a task for this component is initialized within a worker on
- * the cluster. It provides the bolt with the environment in which the bolt
- * executes.
- *
- * <p>
- * This includes the:
- * </p>
- *
- * @param stormConf
- * The Storm configuration for this bolt. This is the
- * configuration provided to the topology merged in with cluster
- * configuration on this machine.
- * @param context
- * This object can be used to get information about this task's
- * place within the topology, including the task id and component
- * id of this task, input and output information, etc.
- * @param collector
- * The collector is used to emit tuples from this bolt. Tuples
- * can be emitted at any time, including the prepare and cleanup
- * methods. The collector is thread-safe and should be saved as
- * an instance variable of this bolt object.
- */
- void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector);
-
- /**
- * Process a single tuple of input. The Tuple object contains metadata on it
- * about which component/stream/task it came from. The values of the Tuple
- * can be accessed using Tuple#getValue. The IBolt does not have to process
- * the Tuple immediately. It is perfectly fine to hang onto a tuple and
- * process it later (for instance, to do an aggregation or join).
- *
- * <p>
- * Tuples should be emitted using the OutputCollector provided through the
- * prepare method. It is required that all input tuples are acked or failed
- * at some point using the OutputCollector. Otherwise, Storm will be unable
- * to determine when tuples coming off the spouts have been completed.
- * </p>
- *
- * <p>
- * For the common case of acking an input tuple at the end of the execute
- * method, see IBasicBolt which automates this.
- * </p>
- *
- * @param input
- * The input tuple to be processed.
- */
- void execute(Tuple input);
-
- /**
- * Called when an IBolt is going to be shutdown. There is no guarentee that
- * cleanup will be called, because the supervisor kill -9's worker processes
- * on the cluster.
- *
- * <p>
- * The one context where cleanup is guaranteed to be called is when a
- * topology is killed when running Storm in local mode.
- * </p>
- */
- void cleanup();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java b/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java
deleted file mode 100644
index ae04710..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IErrorReporter.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.task;
-
-public interface IErrorReporter {
- void reportError(Throwable error);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java b/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java
deleted file mode 100644
index d4ace69..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IMetricsContext.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-
-public interface IMetricsContext {
- <T extends IMetric> T registerMetric(String name, T metric,
- int timeBucketSizeInSecs);
-
- ReducedMetric registerMetric(String name, IReducer reducer,
- int timeBucketSizeInSecs);
-
- CombinedMetric registerMetric(String name, ICombiner combiner,
- int timeBucketSizeInSecs);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java
deleted file mode 100644
index dcf2217..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/IOutputCollector.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.tuple.Tuple;
-import java.util.Collection;
-import java.util.List;
-
-public interface IOutputCollector extends IErrorReporter {
- /**
- * Returns the task ids that received the tuples.
- */
- List<Integer> emit(String streamId, Collection<Tuple> anchors,
- List<Object> tuple);
-
- void emitDirect(int taskId, String streamId, Collection<Tuple> anchors,
- List<Object> tuple);
-
- void ack(Tuple input);
-
- void fail(Tuple input);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java b/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java
deleted file mode 100644
index 9cee3a8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/task/OutputCollector.java
+++ /dev/null
@@ -1,245 +0,0 @@
-package backtype.storm.task;
-
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * This output collector exposes the API for emitting tuples from an IRichBolt.
- * This is the core API for emitting tuples. For a simpler API, and a more
- * restricted form of stream processing, see IBasicBolt and
- * BasicOutputCollector.
- */
-public class OutputCollector implements IOutputCollector {
- private IOutputCollector _delegate;
-
- public OutputCollector(IOutputCollector delegate) {
- _delegate = delegate;
- }
-
- /**
- * Emits a new tuple to a specific stream with a single anchor. The emitted
- * values must be immutable.
- *
- * @param streamId
- * the stream to emit to
- * @param anchor
- * the tuple to anchor to
- * @param tuple
- * the new output tuple from this bolt
- * @return the list of task ids that this new tuple was sent to
- */
- public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) {
- return emit(streamId, Arrays.asList(anchor), tuple);
- }
-
- /**
- * Emits a new unanchored tuple to the specified stream. Because it's
- * unanchored, if a failure happens downstream, this new tuple won't affect
- * whether any spout tuples are considered failed or not. The emitted values
- * must be immutable.
- *
- * @param streamId
- * the stream to emit to
- * @param tuple
- * the new output tuple from this bolt
- * @return the list of task ids that this new tuple was sent to
- */
- public List<Integer> emit(String streamId, List<Object> tuple) {
- return emit(streamId, (List) null, tuple);
- }
-
- /**
- * Emits a new tuple to the default stream anchored on a group of input
- * tuples. The emitted values must be immutable.
- *
- * @param anchors
- * the tuples to anchor to
- * @param tuple
- * the new output tuple from this bolt
- * @return the list of task ids that this new tuple was sent to
- */
- public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {
- return emit(Utils.DEFAULT_STREAM_ID, anchors, tuple);
- }
-
- /**
- * Emits a new tuple to the default stream anchored on a single tuple. The
- * emitted values must be immutable.
- *
- * @param anchor
- * the tuple to anchor to
- * @param tuple
- * the new output tuple from this bolt
- * @return the list of task ids that this new tuple was sent to
- */
- public List<Integer> emit(Tuple anchor, List<Object> tuple) {
- return emit(Utils.DEFAULT_STREAM_ID, anchor, tuple);
- }
-
- /**
- * Emits a new unanchored tuple to the default stream. Beacuse it's
- * unanchored, if a failure happens downstream, this new tuple won't affect
- * whether any spout tuples are considered failed or not. The emitted values
- * must be immutable.
- *
- * @param tuple
- * the new output tuple from this bolt
- * @return the list of task ids that this new tuple was sent to
- */
- public List<Integer> emit(List<Object> tuple) {
- return emit(Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- /**
- * Emits a tuple directly to the specified task id on the specified stream.
- * If the target bolt does not subscribe to this bolt using a direct
- * grouping, the tuple will not be sent. If the specified output stream is
- * not declared as direct, or the target bolt subscribes with a non-direct
- * grouping, an error will occur at runtime. The emitted values must be
- * immutable.
- *
- * @param taskId
- * the taskId to send the new tuple to
- * @param streamId
- * the stream to send the tuple on. It must be declared as a
- * direct stream in the topology definition.
- * @param anchor
- * the tuple to anchor to
- * @param tuple
- * the new output tuple from this bolt
- */
- public void emitDirect(int taskId, String streamId, Tuple anchor,
- List<Object> tuple) {
- emitDirect(taskId, streamId, Arrays.asList(anchor), tuple);
- }
-
- /**
- * Emits a tuple directly to the specified task id on the specified stream.
- * If the target bolt does not subscribe to this bolt using a direct
- * grouping, the tuple will not be sent. If the specified output stream is
- * not declared as direct, or the target bolt subscribes with a non-direct
- * grouping, an error will occur at runtime. Note that this method does not
- * use anchors, so downstream failures won't affect the failure status of
- * any spout tuples. The emitted values must be immutable.
- *
- * @param taskId
- * the taskId to send the new tuple to
- * @param streamId
- * the stream to send the tuple on. It must be declared as a
- * direct stream in the topology definition.
- * @param tuple
- * the new output tuple from this bolt
- */
- public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- emitDirect(taskId, streamId, (List) null, tuple);
- }
-
- /**
- * Emits a tuple directly to the specified task id on the default stream. If
- * the target bolt does not subscribe to this bolt using a direct grouping,
- * the tuple will not be sent. If the specified output stream is not
- * declared as direct, or the target bolt subscribes with a non-direct
- * grouping, an error will occur at runtime. The emitted values must be
- * immutable.
- *
- * <p>
- * The default stream must be declared as direct in the topology definition.
- * See OutputDeclarer#declare for how this is done when defining topologies
- * in Java.
- * </p>
- *
- * @param taskId
- * the taskId to send the new tuple to
- * @param anchosr
- * the tuples to anchor to
- * @param tuple
- * the new output tuple from this bolt
- */
- public void emitDirect(int taskId, Collection<Tuple> anchors,
- List<Object> tuple) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, anchors, tuple);
- }
-
- /**
- * Emits a tuple directly to the specified task id on the default stream. If
- * the target bolt does not subscribe to this bolt using a direct grouping,
- * the tuple will not be sent. If the specified output stream is not
- * declared as direct, or the target bolt subscribes with a non-direct
- * grouping, an error will occur at runtime. The emitted values must be
- * immutable.
- *
- * <p>
- * The default stream must be declared as direct in the topology definition.
- * See OutputDeclarer#declare for how this is done when defining topologies
- * in Java.
- * </p>
- *
- * @param taskId
- * the taskId to send the new tuple to
- * @param anchor
- * the tuple to anchor to
- * @param tuple
- * the new output tuple from this bolt
- */
- public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, anchor, tuple);
- }
-
- /**
- * Emits a tuple directly to the specified task id on the default stream. If
- * the target bolt does not subscribe to this bolt using a direct grouping,
- * the tuple will not be sent. If the specified output stream is not
- * declared as direct, or the target bolt subscribes with a non-direct
- * grouping, an error will occur at runtime. The emitted values must be
- * immutable.
- *
- * <p>
- * The default stream must be declared as direct in the topology definition.
- * See OutputDeclarer#declare for how this is done when defining topologies
- * in Java.
- * </p>
- *
- * <p>
- * Note that this method does not use anchors, so downstream failures won't
- * affect the failure status of any spout tuples.
- * </p>
- *
- * @param taskId
- * the taskId to send the new tuple to
- * @param tuple
- * the new output tuple from this bolt
- */
- public void emitDirect(int taskId, List<Object> tuple) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- @Override
- public List<Integer> emit(String streamId, Collection<Tuple> anchors,
- List<Object> tuple) {
- return _delegate.emit(streamId, anchors, tuple);
- }
-
- @Override
- public void emitDirect(int taskId, String streamId,
- Collection<Tuple> anchors, List<Object> tuple) {
- _delegate.emitDirect(taskId, streamId, anchors, tuple);
- }
-
- @Override
- public void ack(Tuple input) {
- _delegate.ack(input);
- }
-
- @Override
- public void fail(Tuple input) {
- _delegate.fail(input);
- }
-
- @Override
- public void reportError(Throwable error) {
- _delegate.reportError(error);
- }
-}