You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:08 UTC
[29/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java b/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java
deleted file mode 100644
index 0bf878c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java
+++ /dev/null
@@ -1,560 +0,0 @@
-/**
- * Autogenerated by Thrift Compiler (0.7.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- */
-package backtype.storm.generated;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkerSummary implements org.apache.thrift7.TBase<WorkerSummary, WorkerSummary._Fields>, java.io.Serializable, Cloneable {
- private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("WorkerSummary");
-
- private static final org.apache.thrift7.protocol.TField PORT_FIELD_DESC = new org.apache.thrift7.protocol.TField("port", org.apache.thrift7.protocol.TType.I32, (short)1);
- private static final org.apache.thrift7.protocol.TField TOPOLOGY_FIELD_DESC = new org.apache.thrift7.protocol.TField("topology", org.apache.thrift7.protocol.TType.STRING, (short)2);
- private static final org.apache.thrift7.protocol.TField TASKS_FIELD_DESC = new org.apache.thrift7.protocol.TField("tasks", org.apache.thrift7.protocol.TType.LIST, (short)3);
-
- private int port; // required
- private String topology; // required
- private List<TaskSummary> tasks; // required
-
- /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
- public enum _Fields implements org.apache.thrift7.TFieldIdEnum {
- PORT((short)1, "port"),
- TOPOLOGY((short)2, "topology"),
- TASKS((short)3, "tasks");
-
- private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
- static {
- for (_Fields field : EnumSet.allOf(_Fields.class)) {
- byName.put(field.getFieldName(), field);
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, or null if its not found.
- */
- public static _Fields findByThriftId(int fieldId) {
- switch(fieldId) {
- case 1: // PORT
- return PORT;
- case 2: // TOPOLOGY
- return TOPOLOGY;
- case 3: // TASKS
- return TASKS;
- default:
- return null;
- }
- }
-
- /**
- * Find the _Fields constant that matches fieldId, throwing an exception
- * if it is not found.
- */
- public static _Fields findByThriftIdOrThrow(int fieldId) {
- _Fields fields = findByThriftId(fieldId);
- if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
- return fields;
- }
-
- /**
- * Find the _Fields constant that matches name, or null if its not found.
- */
- public static _Fields findByName(String name) {
- return byName.get(name);
- }
-
- private final short _thriftId;
- private final String _fieldName;
-
- _Fields(short thriftId, String fieldName) {
- _thriftId = thriftId;
- _fieldName = fieldName;
- }
-
- public short getThriftFieldId() {
- return _thriftId;
- }
-
- public String getFieldName() {
- return _fieldName;
- }
- }
-
- // isset id assignments
- private static final int __PORT_ISSET_ID = 0;
- private BitSet __isset_bit_vector = new BitSet(1);
-
- public static final Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> metaDataMap;
- static {
- Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift7.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.PORT, new org.apache.thrift7.meta_data.FieldMetaData("port", org.apache.thrift7.TFieldRequirementType.REQUIRED,
- new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I32)));
- tmpMap.put(_Fields.TOPOLOGY, new org.apache.thrift7.meta_data.FieldMetaData("topology", org.apache.thrift7.TFieldRequirementType.REQUIRED,
- new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING)));
- tmpMap.put(_Fields.TASKS, new org.apache.thrift7.meta_data.FieldMetaData("tasks", org.apache.thrift7.TFieldRequirementType.REQUIRED,
- new org.apache.thrift7.meta_data.ListMetaData(org.apache.thrift7.protocol.TType.LIST,
- new org.apache.thrift7.meta_data.StructMetaData(org.apache.thrift7.protocol.TType.STRUCT, TaskSummary.class))));
- metaDataMap = Collections.unmodifiableMap(tmpMap);
- org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(WorkerSummary.class, metaDataMap);
- }
-
- public WorkerSummary() {
- }
-
- public WorkerSummary(
- int port,
- String topology,
- List<TaskSummary> tasks)
- {
- this();
- this.port = port;
- set_port_isSet(true);
- this.topology = topology;
- this.tasks = tasks;
- }
-
- /**
- * Performs a deep copy on <i>other</i>.
- */
- public WorkerSummary(WorkerSummary other) {
- __isset_bit_vector.clear();
- __isset_bit_vector.or(other.__isset_bit_vector);
- this.port = other.port;
- if (other.is_set_topology()) {
- this.topology = other.topology;
- }
- if (other.is_set_tasks()) {
- List<TaskSummary> __this__tasks = new ArrayList<TaskSummary>();
- for (TaskSummary other_element : other.tasks) {
- __this__tasks.add(new TaskSummary(other_element));
- }
- this.tasks = __this__tasks;
- }
- }
-
- public WorkerSummary deepCopy() {
- return new WorkerSummary(this);
- }
-
- @Override
- public void clear() {
- set_port_isSet(false);
- this.port = 0;
- this.topology = null;
- this.tasks = null;
- }
-
- public int get_port() {
- return this.port;
- }
-
- public void set_port(int port) {
- this.port = port;
- set_port_isSet(true);
- }
-
- public void unset_port() {
- __isset_bit_vector.clear(__PORT_ISSET_ID);
- }
-
- /** Returns true if field port is set (has been assigned a value) and false otherwise */
- public boolean is_set_port() {
- return __isset_bit_vector.get(__PORT_ISSET_ID);
- }
-
- public void set_port_isSet(boolean value) {
- __isset_bit_vector.set(__PORT_ISSET_ID, value);
- }
-
- public String get_topology() {
- return this.topology;
- }
-
- public void set_topology(String topology) {
- this.topology = topology;
- }
-
- public void unset_topology() {
- this.topology = null;
- }
-
- /** Returns true if field topology is set (has been assigned a value) and false otherwise */
- public boolean is_set_topology() {
- return this.topology != null;
- }
-
- public void set_topology_isSet(boolean value) {
- if (!value) {
- this.topology = null;
- }
- }
-
- public int get_tasks_size() {
- return (this.tasks == null) ? 0 : this.tasks.size();
- }
-
- public java.util.Iterator<TaskSummary> get_tasks_iterator() {
- return (this.tasks == null) ? null : this.tasks.iterator();
- }
-
- public void add_to_tasks(TaskSummary elem) {
- if (this.tasks == null) {
- this.tasks = new ArrayList<TaskSummary>();
- }
- this.tasks.add(elem);
- }
-
- public List<TaskSummary> get_tasks() {
- return this.tasks;
- }
-
- public void set_tasks(List<TaskSummary> tasks) {
- this.tasks = tasks;
- }
-
- public void unset_tasks() {
- this.tasks = null;
- }
-
- /** Returns true if field tasks is set (has been assigned a value) and false otherwise */
- public boolean is_set_tasks() {
- return this.tasks != null;
- }
-
- public void set_tasks_isSet(boolean value) {
- if (!value) {
- this.tasks = null;
- }
- }
-
- public void setFieldValue(_Fields field, Object value) {
- switch (field) {
- case PORT:
- if (value == null) {
- unset_port();
- } else {
- set_port((Integer)value);
- }
- break;
-
- case TOPOLOGY:
- if (value == null) {
- unset_topology();
- } else {
- set_topology((String)value);
- }
- break;
-
- case TASKS:
- if (value == null) {
- unset_tasks();
- } else {
- set_tasks((List<TaskSummary>)value);
- }
- break;
-
- }
- }
-
- public Object getFieldValue(_Fields field) {
- switch (field) {
- case PORT:
- return Integer.valueOf(get_port());
-
- case TOPOLOGY:
- return get_topology();
-
- case TASKS:
- return get_tasks();
-
- }
- throw new IllegalStateException();
- }
-
- /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
- public boolean isSet(_Fields field) {
- if (field == null) {
- throw new IllegalArgumentException();
- }
-
- switch (field) {
- case PORT:
- return is_set_port();
- case TOPOLOGY:
- return is_set_topology();
- case TASKS:
- return is_set_tasks();
- }
- throw new IllegalStateException();
- }
-
- @Override
- public boolean equals(Object that) {
- if (that == null)
- return false;
- if (that instanceof WorkerSummary)
- return this.equals((WorkerSummary)that);
- return false;
- }
-
- public boolean equals(WorkerSummary that) {
- if (that == null)
- return false;
-
- boolean this_present_port = true;
- boolean that_present_port = true;
- if (this_present_port || that_present_port) {
- if (!(this_present_port && that_present_port))
- return false;
- if (this.port != that.port)
- return false;
- }
-
- boolean this_present_topology = true && this.is_set_topology();
- boolean that_present_topology = true && that.is_set_topology();
- if (this_present_topology || that_present_topology) {
- if (!(this_present_topology && that_present_topology))
- return false;
- if (!this.topology.equals(that.topology))
- return false;
- }
-
- boolean this_present_tasks = true && this.is_set_tasks();
- boolean that_present_tasks = true && that.is_set_tasks();
- if (this_present_tasks || that_present_tasks) {
- if (!(this_present_tasks && that_present_tasks))
- return false;
- if (!this.tasks.equals(that.tasks))
- return false;
- }
-
- return true;
- }
-
- @Override
- public int hashCode() {
- HashCodeBuilder builder = new HashCodeBuilder();
-
- boolean present_port = true;
- builder.append(present_port);
- if (present_port)
- builder.append(port);
-
- boolean present_topology = true && (is_set_topology());
- builder.append(present_topology);
- if (present_topology)
- builder.append(topology);
-
- boolean present_tasks = true && (is_set_tasks());
- builder.append(present_tasks);
- if (present_tasks)
- builder.append(tasks);
-
- return builder.toHashCode();
- }
-
- public int compareTo(WorkerSummary other) {
- if (!getClass().equals(other.getClass())) {
- return getClass().getName().compareTo(other.getClass().getName());
- }
-
- int lastComparison = 0;
- WorkerSummary typedOther = (WorkerSummary)other;
-
- lastComparison = Boolean.valueOf(is_set_port()).compareTo(typedOther.is_set_port());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (is_set_port()) {
- lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.port, typedOther.port);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = Boolean.valueOf(is_set_topology()).compareTo(typedOther.is_set_topology());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (is_set_topology()) {
- lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.topology, typedOther.topology);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- lastComparison = Boolean.valueOf(is_set_tasks()).compareTo(typedOther.is_set_tasks());
- if (lastComparison != 0) {
- return lastComparison;
- }
- if (is_set_tasks()) {
- lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.tasks, typedOther.tasks);
- if (lastComparison != 0) {
- return lastComparison;
- }
- }
- return 0;
- }
-
- public _Fields fieldForId(int fieldId) {
- return _Fields.findByThriftId(fieldId);
- }
-
- public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache.thrift7.TException {
- org.apache.thrift7.protocol.TField field;
- iprot.readStructBegin();
- while (true)
- {
- field = iprot.readFieldBegin();
- if (field.type == org.apache.thrift7.protocol.TType.STOP) {
- break;
- }
- switch (field.id) {
- case 1: // PORT
- if (field.type == org.apache.thrift7.protocol.TType.I32) {
- this.port = iprot.readI32();
- set_port_isSet(true);
- } else {
- org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 2: // TOPOLOGY
- if (field.type == org.apache.thrift7.protocol.TType.STRING) {
- this.topology = iprot.readString();
- } else {
- org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- case 3: // TASKS
- if (field.type == org.apache.thrift7.protocol.TType.LIST) {
- {
- org.apache.thrift7.protocol.TList _list189 = iprot.readListBegin();
- this.tasks = new ArrayList<TaskSummary>(_list189.size);
- for (int _i190 = 0; _i190 < _list189.size; ++_i190)
- {
- TaskSummary _elem191; // required
- _elem191 = new TaskSummary();
- _elem191.read(iprot);
- this.tasks.add(_elem191);
- }
- iprot.readListEnd();
- }
- } else {
- org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
- default:
- org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- iprot.readFieldEnd();
- }
- iprot.readStructEnd();
- validate();
- }
-
- public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache.thrift7.TException {
- validate();
-
- oprot.writeStructBegin(STRUCT_DESC);
- oprot.writeFieldBegin(PORT_FIELD_DESC);
- oprot.writeI32(this.port);
- oprot.writeFieldEnd();
- if (this.topology != null) {
- oprot.writeFieldBegin(TOPOLOGY_FIELD_DESC);
- oprot.writeString(this.topology);
- oprot.writeFieldEnd();
- }
- if (this.tasks != null) {
- oprot.writeFieldBegin(TASKS_FIELD_DESC);
- {
- oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.tasks.size()));
- for (TaskSummary _iter192 : this.tasks)
- {
- _iter192.write(oprot);
- }
- oprot.writeListEnd();
- }
- oprot.writeFieldEnd();
- }
- oprot.writeFieldStop();
- oprot.writeStructEnd();
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder("WorkerSummary(");
- boolean first = true;
-
- sb.append("port:");
- sb.append(this.port);
- first = false;
- if (!first) sb.append(", ");
- sb.append("topology:");
- if (this.topology == null) {
- sb.append("null");
- } else {
- sb.append(this.topology);
- }
- first = false;
- if (!first) sb.append(", ");
- sb.append("tasks:");
- if (this.tasks == null) {
- sb.append("null");
- } else {
- sb.append(this.tasks);
- }
- first = false;
- sb.append(")");
- return sb.toString();
- }
-
- public void validate() throws org.apache.thrift7.TException {
- // check for required fields
- if (!is_set_port()) {
- throw new org.apache.thrift7.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString());
- }
-
- if (!is_set_topology()) {
- throw new org.apache.thrift7.protocol.TProtocolException("Required field 'topology' is unset! Struct:" + toString());
- }
-
- if (!is_set_tasks()) {
- throw new org.apache.thrift7.protocol.TProtocolException("Required field 'tasks' is unset! Struct:" + toString());
- }
-
- }
-
- private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
- try {
- write(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(out)));
- } catch (org.apache.thrift7.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
- private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
- try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
- read(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(in)));
- } catch (org.apache.thrift7.TException te) {
- throw new java.io.IOException(te);
- }
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java b/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java
deleted file mode 100644
index 15e37a8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package backtype.storm.grouping;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.WorkerTopologyContext;
-import java.io.Serializable;
-import java.util.List;
-
-public interface CustomStreamGrouping extends Serializable {
-
- /**
- * Tells the stream grouping at runtime the tasks in the target bolt. This
- * information should be used in chooseTasks to determine the target tasks.
- *
- * It also tells the grouping the metadata on the stream this grouping will
- * be used on.
- */
- void prepare(WorkerTopologyContext context, GlobalStreamId stream,
- List<Integer> targetTasks);
-
- /**
- * This function implements a custom stream grouping. It takes in as input
- * the number of tasks in the target bolt in prepare and returns the tasks
- * to send the tuples to.
- *
- * @param values
- * the values to group on
- */
- List<Integer> chooseTasks(int taskId, List<Object> values);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java b/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java
deleted file mode 100644
index a2aac33..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.hooks;
-
-import backtype.storm.hooks.info.BoltAckInfo;
-import backtype.storm.hooks.info.BoltExecuteInfo;
-import backtype.storm.hooks.info.BoltFailInfo;
-import backtype.storm.hooks.info.EmitInfo;
-import backtype.storm.hooks.info.SpoutAckInfo;
-import backtype.storm.hooks.info.SpoutFailInfo;
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-public class BaseTaskHook implements ITaskHook {
- @Override
- public void prepare(Map conf, TopologyContext context) {
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public void emit(EmitInfo info) {
- }
-
- @Override
- public void spoutAck(SpoutAckInfo info) {
- }
-
- @Override
- public void spoutFail(SpoutFailInfo info) {
- }
-
- @Override
- public void boltAck(BoltAckInfo info) {
- }
-
- @Override
- public void boltFail(BoltFailInfo info) {
- }
-
- @Override
- public void boltExecute(BoltExecuteInfo info) {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java b/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java
deleted file mode 100644
index f705f12..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.hooks;
-
-import backtype.storm.hooks.info.BoltAckInfo;
-import backtype.storm.hooks.info.BoltExecuteInfo;
-import backtype.storm.hooks.info.SpoutFailInfo;
-import backtype.storm.hooks.info.SpoutAckInfo;
-import backtype.storm.hooks.info.EmitInfo;
-import backtype.storm.hooks.info.BoltFailInfo;
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-public interface ITaskHook {
- void prepare(Map conf, TopologyContext context);
-
- void cleanup();
-
- void emit(EmitInfo info);
-
- void spoutAck(SpoutAckInfo info);
-
- void spoutFail(SpoutFailInfo info);
-
- void boltExecute(BoltExecuteInfo info);
-
- void boltAck(BoltAckInfo info);
-
- void boltFail(BoltFailInfo info);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java
deleted file mode 100644
index b0f0a9b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.hooks.info;
-
-import backtype.storm.tuple.Tuple;
-
-public class BoltAckInfo {
- public Tuple tuple;
- public int ackingTaskId;
- public Long processLatencyMs; // null if it wasn't sampled
-
- public BoltAckInfo(Tuple tuple, int ackingTaskId, Long processLatencyMs) {
- this.tuple = tuple;
- this.ackingTaskId = ackingTaskId;
- this.processLatencyMs = processLatencyMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java
deleted file mode 100644
index 31ca373..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.hooks.info;
-
-import backtype.storm.tuple.Tuple;
-
-public class BoltExecuteInfo {
- public Tuple tuple;
- public int executingTaskId;
- public Long executeLatencyMs; // null if it wasn't sampled
-
- public BoltExecuteInfo(Tuple tuple, int executingTaskId,
- Long executeLatencyMs) {
- this.tuple = tuple;
- this.executingTaskId = executingTaskId;
- this.executeLatencyMs = executeLatencyMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java
deleted file mode 100644
index 3a3dfec..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.hooks.info;
-
-import backtype.storm.tuple.Tuple;
-
-public class BoltFailInfo {
- public Tuple tuple;
- public int failingTaskId;
- public Long failLatencyMs; // null if it wasn't sampled
-
- public BoltFailInfo(Tuple tuple, int failingTaskId, Long failLatencyMs) {
- this.tuple = tuple;
- this.failingTaskId = failingTaskId;
- this.failLatencyMs = failLatencyMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java
deleted file mode 100644
index 39b9688..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package backtype.storm.hooks.info;
-
-import java.util.Collection;
-import java.util.List;
-
-public class EmitInfo {
- public List<Object> values;
- public String stream;
- public int taskId;
- public Collection<Integer> outTasks;
-
- public EmitInfo(List<Object> values, String stream, int taskId,
- Collection<Integer> outTasks) {
- this.values = values;
- this.stream = stream;
- this.taskId = taskId;
- this.outTasks = outTasks;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java
deleted file mode 100644
index f74efae..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.hooks.info;
-
-public class SpoutAckInfo {
- public Object messageId;
- public int spoutTaskId;
- public Long completeLatencyMs; // null if it wasn't sampled
-
- public SpoutAckInfo(Object messageId, int spoutTaskId,
- Long completeLatencyMs) {
- this.messageId = messageId;
- this.spoutTaskId = spoutTaskId;
- this.completeLatencyMs = completeLatencyMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java
deleted file mode 100644
index 8052b4a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package backtype.storm.hooks.info;
-
-public class SpoutFailInfo {
- public Object messageId;
- public int spoutTaskId;
- public Long failLatencyMs; // null if it wasn't sampled
-
- public SpoutFailInfo(Object messageId, int spoutTaskId, Long failLatencyMs) {
- this.messageId = messageId;
- this.spoutTaskId = spoutTaskId;
- this.failLatencyMs = failLatencyMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java b/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java
deleted file mode 100644
index f61e818..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.messaging;
-
-import java.util.List;
-
-import backtype.storm.utils.DisruptorQueue;
-
-public interface IConnection {
-
- /**
- * (flags != 1) synchronously
- * (flags==1) asynchronously
- *
- * @param flags
- * @return
- */
- public TaskMessage recv(int flags);
-
- /**
- * In the new design, receive flow is through registerQueue,
- * then push message into queue
- *
- * @param recvQueu
- */
- public void registerQueue(DisruptorQueue recvQueu);
- public void enqueue(TaskMessage message);
-
- public void send(List<TaskMessage> messages);
- public void send(TaskMessage message);
-
- /**
- * close this connection
- */
- public void close();
-
- public boolean isClosed();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java b/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java
deleted file mode 100644
index 760b6e5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package backtype.storm.messaging;
-
-import java.util.Map;
-
-import backtype.storm.utils.DisruptorQueue;
-
-/**
- * This interface needs to be implemented for messaging plugin.
- *
- * Messaging plugin is specified via Storm config parameter,
- * storm.messaging.transport.
- *
- * A messaging plugin should have a default constructor and implements IContext
- * interface. Upon construction, we will invoke IContext::prepare(storm_conf) to
- * enable context to be configured according to storm configuration.
- */
-public interface IContext {
- /**
- * This method is invoked at the startup of messaging plugin
- *
- * @param storm_conf
- * storm configuration
- */
- public void prepare(Map storm_conf);
-
- /**
- * This method is invoked when a worker is unload a messaging plugin
- */
- public void term();
-
- /**
- * This method establishes a server side connection
- *
- * @param topology_id
- * topology ID
- * @param port
- * port #
- * @param distribute
- * true -- receive other worker's data
- * @return server side connection
- */
- public IConnection bind(String topology_id, int port);
-
- /**
- * This method establish a client side connection to a remote server
- *
- * @param topology_id
- * topology ID
- * @param host
- * remote host
- * @param port
- * remote port
- * @param distribute
- * true -- send other worker data
- * @return client side connection
- */
- public IConnection connect(String topology_id, String host, int port);
-};
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java b/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java
deleted file mode 100644
index cab968f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package backtype.storm.messaging;
-
-import java.nio.ByteBuffer;
-
-public class TaskMessage {
- private int _task;
- private byte[] _message;
-
- public TaskMessage(int task, byte[] message) {
- _task = task;
- _message = message;
- }
-
- public int task() {
- return _task;
- }
-
- public byte[] message() {
- return _message;
- }
-
- public static boolean isEmpty(TaskMessage message) {
- if (message == null) {
- return true;
- }else if (message.message() == null) {
- return true;
- }else if (message.message().length == 0) {
- return true;
- }
-
- return false;
- }
-
- @Deprecated
- public ByteBuffer serialize() {
- ByteBuffer bb = ByteBuffer.allocate(_message.length + 2);
- bb.putShort((short) _task);
- bb.put(_message);
- return bb;
- }
-
- @Deprecated
- public void deserialize(ByteBuffer packet) {
- if (packet == null)
- return;
- _task = packet.getShort();
- _message = new byte[packet.limit() - 2];
- packet.get(_message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java b/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java
deleted file mode 100644
index 8830496..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package backtype.storm.messaging;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.Config;
-
-public class TransportFactory {
- public static final Logger LOG = Logger.getLogger(TransportFactory.class);
-
- public static IContext makeContext(Map storm_conf) {
-
- // get factory class name
- String transport_plugin_klassName = (String) storm_conf
- .get(Config.STORM_MESSAGING_TRANSPORT);
- LOG.info("JStorm peer transport plugin:" + transport_plugin_klassName);
-
- IContext transport = null;
- try {
- // create a factory class
- Class klass = Class.forName(transport_plugin_klassName);
- // obtain a context object
- // Object obj = klass.newInstance();
- Constructor constructor = klass.getDeclaredConstructor();
- constructor.setAccessible(true);
- Object obj = constructor.newInstance();
- LOG.info("TransportFactory makeContext: new klass: " + obj);
- if (obj instanceof IContext) {
- // case 1: plugin is a IContext class
- transport = (IContext) obj;
- // initialize with storm configuration
- transport.prepare(storm_conf);
- LOG.info("TransportFactory makeContext: start prepare... "
- + storm_conf);
- } else {
- // case 2: Non-IContext plugin must have a
- // makeContext(storm_conf) method that returns IContext object
- Method method = klass.getMethod("makeContext", Map.class);
- LOG.debug("object:" + obj + " method:" + method);
- transport = (IContext) method.invoke(obj, storm_conf);
- }
- LOG.info("TransportFactory makeContext done...");
- } catch (Exception e) {
- throw new RuntimeException(
- "Fail to construct messaging plugin from plugin "
- + transport_plugin_klassName, e);
- }
- return transport;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java b/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
deleted file mode 100644
index 19c2235..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.metric;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.Utils;
-
-/*
- * Listens for all metrics, dumps them to log
- *
- * To use, add this to your topology's configuration:
- * conf.registerMetricsConsumer(backtype.storm.metrics.LoggingMetricsConsumer.class, 1);
- *
- * Or edit the storm.yaml config file:
- *
- * topology.metrics.consumer.register:
- * - class: "backtype.storm.metrics.LoggingMetricsConsumer"
- * parallelism.hint: 1
- *
- */
-public class LoggingMetricsConsumer implements IMetricsConsumer {
- public static final Logger LOG = LoggerFactory
- .getLogger(LoggingMetricsConsumer.class);
-
- @Override
- public void prepare(Map stormConf, Object registrationArgument,
- TopologyContext context, IErrorReporter errorReporter) {
- }
-
- static private String padding = " ";
-
- @Override
- public void handleDataPoints(TaskInfo taskInfo,
- Collection<DataPoint> dataPoints) {
- StringBuilder sb = new StringBuilder();
- String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
- taskInfo.timestamp, taskInfo.srcWorkerHost,
- taskInfo.srcWorkerPort, taskInfo.srcTaskId,
- taskInfo.srcComponentId);
- sb.append(header);
- for (DataPoint p : dataPoints) {
- sb.delete(header.length(), sb.length());
- sb.append(p.name).append(padding)
- .delete(header.length() + 23, sb.length()).append("\t")
- .append(p.value);
- LOG.info(sb.toString());
- }
- }
-
- @Override
- public void cleanup() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java b/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
deleted file mode 100644
index 994cb56..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package backtype.storm.metric;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import java.util.Collection;
-import java.util.Map;
-
-public class MetricsConsumerBolt implements IBolt {
- IMetricsConsumer _metricsConsumer;
- String _consumerClassName;
- OutputCollector _collector;
- Object _registrationArgument;
-
- public MetricsConsumerBolt(String consumerClassName,
- Object registrationArgument) {
- _consumerClassName = consumerClassName;
- _registrationArgument = registrationArgument;
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- try {
- _metricsConsumer = (IMetricsConsumer) Class.forName(
- _consumerClassName).newInstance();
- } catch (Exception e) {
- throw new RuntimeException(
- "Could not instantiate a class listed in config under section "
- + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER
- + " with fully qualified name "
- + _consumerClassName, e);
- }
- _metricsConsumer.prepare(stormConf, _registrationArgument, context,
- (IErrorReporter) collector);
- _collector = collector;
- }
-
- @Override
- public void execute(Tuple input) {
- _metricsConsumer.handleDataPoints(
- (IMetricsConsumer.TaskInfo) input.getValue(0),
- (Collection) input.getValue(1));
- _collector.ack(input);
- }
-
- @Override
- public void cleanup() {
- _metricsConsumer.cleanup();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java b/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java
deleted file mode 100644
index 07bdc28..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package backtype.storm.metric;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.AssignableMetric;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import clojure.lang.AFn;
-import clojure.lang.IFn;
-import clojure.lang.RT;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.*;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// There is one task inside one executor for each worker of the topology.
-// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
-// This bolt was conceived to export worker stats via metrics api.
-public class SystemBolt implements IBolt {
- private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
- private static boolean _prepareWasCalled = false;
-
- private static class MemoryUsageMetric implements IMetric {
- IFn _getUsage;
-
- public MemoryUsageMetric(IFn getUsage) {
- _getUsage = getUsage;
- }
-
- @Override
- public Object getValueAndReset() {
- MemoryUsage memUsage;
- try {
- memUsage = (MemoryUsage) _getUsage.invoke();
- } catch (Exception e) {
- LOG.error("Failed to get userage ", e);
- throw new RuntimeException(e);
- }
- HashMap m = new HashMap();
- m.put("maxBytes", memUsage.getMax());
- m.put("committedBytes", memUsage.getCommitted());
- m.put("initBytes", memUsage.getInit());
- m.put("usedBytes", memUsage.getUsed());
- m.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed());
- m.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed());
- return m;
- }
- }
-
- // canonically the metrics data exported is time bucketed when doing counts.
- // convert the absolute values here into time buckets.
- private static class GarbageCollectorMetric implements IMetric {
- GarbageCollectorMXBean _gcBean;
- Long _collectionCount;
- Long _collectionTime;
-
- public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
- _gcBean = gcBean;
- }
-
- @Override
- public Object getValueAndReset() {
- Long collectionCountP = _gcBean.getCollectionCount();
- Long collectionTimeP = _gcBean.getCollectionTime();
-
- Map ret = null;
- if (_collectionCount != null && _collectionTime != null) {
- ret = new HashMap();
- ret.put("count", collectionCountP - _collectionCount);
- ret.put("timeMs", collectionTimeP - _collectionTime);
- }
-
- _collectionCount = collectionCountP;
- _collectionTime = collectionTimeP;
- return ret;
- }
- }
-
- @Override
- public void prepare(final Map stormConf, TopologyContext context,
- OutputCollector collector) {
- if (_prepareWasCalled
- && !"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) {
- throw new RuntimeException(
- "A single worker should have 1 SystemBolt instance.");
- }
- _prepareWasCalled = true;
-
- int bucketSize = RT.intCast(stormConf
- .get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
-
- final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
-
- context.registerMetric("uptimeSecs", new IMetric() {
- @Override
- public Object getValueAndReset() {
- return jvmRT.getUptime() / 1000.0;
- }
- }, bucketSize);
-
- context.registerMetric("startTimeSecs", new IMetric() {
- @Override
- public Object getValueAndReset() {
- return jvmRT.getStartTime() / 1000.0;
- }
- }, bucketSize);
-
- context.registerMetric("newWorkerEvent", new IMetric() {
- boolean doEvent = true;
-
- @Override
- public Object getValueAndReset() {
- if (doEvent) {
- doEvent = false;
- return 1;
- } else
- return 0;
- }
- }, bucketSize);
-
- final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
-
- context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
- public Object invoke() {
- return jvmMemRT.getHeapMemoryUsage();
- }
- }), bucketSize);
- context.registerMetric("memory/nonHeap", new MemoryUsageMetric(
- new AFn() {
- public Object invoke() {
- return jvmMemRT.getNonHeapMemoryUsage();
- }
- }), bucketSize);
-
- for (GarbageCollectorMXBean b : ManagementFactory
- .getGarbageCollectorMXBeans()) {
- context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""),
- new GarbageCollectorMetric(b), bucketSize);
- }
- }
-
- @Override
- public void execute(Tuple input) {
- throw new RuntimeException(
- "Non-system tuples should never be sent to __system bolt.");
- }
-
- @Override
- public void cleanup() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java
deleted file mode 100644
index ed6dc72..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package backtype.storm.metric.api;
-
-public class AssignableMetric implements IMetric {
- Object _value;
-
- public AssignableMetric(Object value) {
- _value = value;
- }
-
- public void setValue(Object value) {
- _value = value;
- }
-
- public Object getValueAndReset() {
- return _value;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java
deleted file mode 100644
index cf74184..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package backtype.storm.metric.api;
-
-public class CombinedMetric implements IMetric {
- private final ICombiner _combiner;
- private Object _value;
-
- public CombinedMetric(ICombiner combiner) {
- _combiner = combiner;
- _value = _combiner.identity();
- }
-
- public void update(Object value) {
- _value = _combiner.combine(_value, value);
- }
-
- public Object getValueAndReset() {
- Object ret = _value;
- _value = _combiner.identity();
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java
deleted file mode 100644
index 12694cd..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IMetric;
-
-public class CountMetric implements IMetric {
- long _value = 0;
-
- public CountMetric() {
- }
-
- public void incr() {
- _value++;
- }
-
- public void incrBy(long incrementBy) {
- _value += incrementBy;
- }
-
- public Object getValueAndReset() {
- long ret = _value;
- _value = 0;
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java b/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java
deleted file mode 100644
index cdc9363..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface ICombiner<T> {
- public T identity();
-
- public T combine(T a, T b);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java
deleted file mode 100644
index cd50757..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface IMetric {
- public Object getValueAndReset();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
deleted file mode 100644
index 51b8d5b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import java.util.Collection;
-import java.util.Map;
-
-public interface IMetricsConsumer {
- public static class TaskInfo {
- public TaskInfo() {
- }
-
- public TaskInfo(String srcWorkerHost, int srcWorkerPort,
- String srcComponentId, int srcTaskId, long timestamp,
- int updateIntervalSecs) {
- this.srcWorkerHost = srcWorkerHost;
- this.srcWorkerPort = srcWorkerPort;
- this.srcComponentId = srcComponentId;
- this.srcTaskId = srcTaskId;
- this.timestamp = timestamp;
- this.updateIntervalSecs = updateIntervalSecs;
- }
-
- public String srcWorkerHost;
- public int srcWorkerPort;
- public String srcComponentId;
- public int srcTaskId;
- public long timestamp;
- public int updateIntervalSecs;
- }
-
- public static class DataPoint {
- public DataPoint() {
- }
-
- public DataPoint(String name, Object value) {
- this.name = name;
- this.value = value;
- }
-
- @Override
- public String toString() {
- return "[" + name + " = " + value + "]";
- }
-
- public String name;
- public Object value;
- }
-
- void prepare(Map stormConf, Object registrationArgument,
- TopologyContext context, IErrorReporter errorReporter);
-
- void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
-
- void cleanup();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java
deleted file mode 100644
index fe221ae..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface IReducer<T> {
- T init();
-
- T reduce(T accumulator, Object input);
-
- Object extractResult(T accumulator);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java
deleted file mode 100644
index ab37b2c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface IStatefulObject {
- Object getState();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java b/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java
deleted file mode 100644
index 86f4593..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IReducer;
-
-class MeanReducerState {
- public int count = 0;
- public double sum = 0.0;
-}
-
-public class MeanReducer implements IReducer<MeanReducerState> {
- public MeanReducerState init() {
- return new MeanReducerState();
- }
-
- public MeanReducerState reduce(MeanReducerState acc, Object input) {
- acc.count++;
- if (input instanceof Double) {
- acc.sum += (Double) input;
- } else if (input instanceof Long) {
- acc.sum += ((Long) input).doubleValue();
- } else if (input instanceof Integer) {
- acc.sum += ((Integer) input).doubleValue();
- } else {
- throw new RuntimeException(
- "MeanReducer::reduce called with unsupported input type `"
- + input.getClass()
- + "`. Supported types are Double, Long, Integer.");
- }
- return acc;
- }
-
- public Object extractResult(MeanReducerState acc) {
- if (acc.count > 0) {
- return new Double(acc.sum / (double) acc.count);
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
deleted file mode 100644
index f550eeb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IMetric;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MultiCountMetric implements IMetric {
- Map<String, CountMetric> _value = new HashMap();
-
- public MultiCountMetric() {
- }
-
- public CountMetric scope(String key) {
- CountMetric val = _value.get(key);
- if (val == null) {
- _value.put(key, val = new CountMetric());
- }
- return val;
- }
-
- public Object getValueAndReset() {
- Map ret = new HashMap();
- for (Map.Entry<String, CountMetric> e : _value.entrySet()) {
- ret.put(e.getKey(), e.getValue().getValueAndReset());
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
deleted file mode 100644
index 5020fd8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IMetric;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MultiReducedMetric implements IMetric {
- Map<String, ReducedMetric> _value = new HashMap();
- IReducer _reducer;
-
- public MultiReducedMetric(IReducer reducer) {
- _reducer = reducer;
- }
-
- public ReducedMetric scope(String key) {
- ReducedMetric val = _value.get(key);
- if (val == null) {
- _value.put(key, val = new ReducedMetric(_reducer));
- }
- return val;
- }
-
- public Object getValueAndReset() {
- Map ret = new HashMap();
- for (Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
- Object val = e.getValue().getValueAndReset();
- if (val != null) {
- ret.put(e.getKey(), val);
- }
- }
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java
deleted file mode 100644
index b2a7bf8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package backtype.storm.metric.api;
-
-public class ReducedMetric implements IMetric {
- private final IReducer _reducer;
- private Object _accumulator;
-
- public ReducedMetric(IReducer reducer) {
- _reducer = reducer;
- _accumulator = _reducer.init();
- }
-
- public void update(Object value) {
- _accumulator = _reducer.reduce(_accumulator, value);
- }
-
- public Object getValueAndReset() {
- Object ret = _reducer.extractResult(_accumulator);
- _accumulator = _reducer.init();
- return ret;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java
deleted file mode 100644
index 48170ff..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.metric.api;
-
-public class StateMetric implements IMetric {
- private IStatefulObject _obj;
-
- public StateMetric(IStatefulObject obj) {
- _obj = obj;
- }
-
- @Override
- public Object getValueAndReset() {
- return _obj.getState();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java
deleted file mode 100644
index 20387ed..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.AssignableMetric;
-
-public class AssignableShellMetric extends AssignableMetric implements IShellMetric {
- public AssignableShellMetric(Object value) {
- super(value);
- }
-
- public void updateMetricFromRPC(Object value) {
- setValue(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java
deleted file mode 100644
index 231c571..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-
-public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
- public CombinedShellMetric(ICombiner combiner) {
- super(combiner);
- }
-
- public void updateMetricFromRPC(Object value) {
- update(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
deleted file mode 100644
index def74c2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.CountMetric;
-
-public class CountShellMetric extends CountMetric implements IShellMetric {
- /***
- * @param
- * params should be null or long
- * if value is null, it will call incr()
- * if value is long, it will call incrBy((long)params)
- * */
- public void updateMetricFromRPC(Object value) {
- if (value == null) {
- incr();
- } else if (value instanceof Long) {
- incrBy((Long)value);
- } else {
- throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
deleted file mode 100644
index d53baea..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.IMetric;
-
-public interface IShellMetric extends IMetric {
- /***
- * @function
- * This interface is used by ShellBolt and ShellSpout through RPC call to update Metric
- * @param
- * value used to update metric, its's meaning change according implementation
- * Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
- * */
- public void updateMetricFromRPC(Object value);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java
deleted file mode 100644
index 097ed51..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-
-public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
-
- public ReducedShellMetric(IReducer reducer) {
- super(reducer);
- }
-
- public void updateMetricFromRPC(Object value) {
- update(value);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java b/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java
deleted file mode 100644
index 446bdc4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.util.List;
-
-/**
- * BoltMsg is an object that represents the data sent from a shell component to
- * a bolt process that implements a multi-language protocol. It is the union of
- * all data types that a bolt can receive from Storm.
- *
- * <p>
- * BoltMsgs are objects sent to the ISerializer interface, for serialization
- * according to the wire protocol implemented by the serializer. The BoltMsg
- * class allows for a decoupling between the serialized representation of the
- * data and the data itself.
- * </p>
- */
-public class BoltMsg {
- private String id;
- private String comp;
- private String stream;
- private long task;
- private List<Object> tuple;
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getComp() {
- return comp;
- }
-
- public void setComp(String comp) {
- this.comp = comp;
- }
-
- public String getStream() {
- return stream;
- }
-
- public void setStream(String stream) {
- this.stream = stream;
- }
-
- public long getTask() {
- return task;
- }
-
- public void setTask(long task) {
- this.task = task;
- }
-
- public List<Object> getTuple() {
- return tuple;
- }
-
- public void setTuple(List<Object> tuple) {
- this.tuple = tuple;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java b/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java
deleted file mode 100644
index c9c7ad4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.TopologyContext;
-
-/**
- * The ISerializer interface describes the methods that an object should
- * implement to provide serialization and de-serialization capabilities to
- * non-JVM language components.
- */
-public interface ISerializer extends Serializable {
-
- /**
- * This method sets the input and output streams of the serializer
- *
- * @param processIn output stream to non-JVM component
- * @param processOut input stream from non-JVM component
- */
- void initialize(OutputStream processIn, InputStream processOut);
-
- /**
- * This method transmits the Storm config to the non-JVM process and
- * receives its pid.
- *
- * @param conf storm configuration
- * @param context topology context
- * @return process pid
- */
- Number connect(Map conf, TopologyContext context) throws IOException,
- NoOutputException;
-
- /**
- * This method receives a shell message from the non-JVM process
- *
- * @return shell message
- */
- ShellMsg readShellMsg() throws IOException, NoOutputException;
-
- /**
- * This method sends a bolt message to a non-JVM bolt process
- *
- * @param msg bolt message
- */
- void writeBoltMsg(BoltMsg msg) throws IOException;
-
- /**
- * This method sends a spout message to a non-JVM spout process
- *
- * @param msg spout message
- */
- void writeSpoutMsg(SpoutMsg msg) throws IOException;
-
- /**
- * This method sends a list of task IDs to a non-JVM bolt process
- *
- * @param taskIds list of task IDs
- */
- void writeTaskIds(List<Integer> taskIds) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java b/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java
deleted file mode 100644
index 9fca312..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-
-/**
- * JsonSerializer implements the JSON multilang protocol.
- */
-public class JsonSerializer implements ISerializer {
- private DataOutputStream processIn;
- private BufferedReader processOut;
-
- public void initialize(OutputStream processIn, InputStream processOut) {
- this.processIn = new DataOutputStream(processIn);
- try {
- this.processOut = new BufferedReader(new InputStreamReader(processOut, "UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- public Number connect(Map conf, TopologyContext context)
- throws IOException, NoOutputException {
- JSONObject setupInfo = new JSONObject();
- setupInfo.put("pidDir", context.getPIDDir());
- setupInfo.put("conf", conf);
- setupInfo.put("context", context);
- writeMessage(setupInfo);
-
- Number pid = (Number) ((JSONObject) readMessage()).get("pid");
- return pid;
- }
-
- public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
- JSONObject obj = new JSONObject();
- obj.put("id", boltMsg.getId());
- obj.put("comp", boltMsg.getComp());
- obj.put("stream", boltMsg.getStream());
- obj.put("task", boltMsg.getTask());
- obj.put("tuple", boltMsg.getTuple());
- writeMessage(obj);
- }
-
- public void writeSpoutMsg(SpoutMsg msg) throws IOException {
- JSONObject obj = new JSONObject();
- obj.put("command", msg.getCommand());
- obj.put("id", msg.getId());
- writeMessage(obj);
- }
-
- public void writeTaskIds(List<Integer> taskIds) throws IOException {
- writeMessage(taskIds);
- }
-
- private void writeMessage(Object msg) throws IOException {
- writeString(JSONValue.toJSONString(msg));
- }
-
- private void writeString(String str) throws IOException {
- byte[] strBytes = str.getBytes("UTF-8");
- processIn.write(strBytes, 0, strBytes.length);
- processIn.writeBytes("\nend\n");
- processIn.flush();
- }
-
- public ShellMsg readShellMsg() throws IOException, NoOutputException {
- JSONObject msg = (JSONObject) readMessage();
- ShellMsg shellMsg = new ShellMsg();
-
- String command = (String) msg.get("command");
- shellMsg.setCommand(command);
-
- Object id = msg.get("id");
- shellMsg.setId(id);
-
- String log = (String) msg.get("msg");
- shellMsg.setMsg(log);
-
- String stream = (String) msg.get("stream");
- if (stream == null)
- stream = Utils.DEFAULT_STREAM_ID;
- shellMsg.setStream(stream);
-
- Object taskObj = msg.get("task");
- if (taskObj != null) {
- shellMsg.setTask((Long) taskObj);
- } else {
- shellMsg.setTask(0);
- }
-
- Object need_task_ids = msg.get("need_task_ids");
- if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
- shellMsg.setNeedTaskIds(true);
- } else {
- shellMsg.setNeedTaskIds(false);
- }
-
- shellMsg.setTuple((List) msg.get("tuple"));
-
- //List<Tuple> anchors = new ArrayList<Tuple>();
- Object anchorObj = msg.get("anchors");
- if (anchorObj != null) {
- if (anchorObj instanceof String) {
- anchorObj = Arrays.asList(anchorObj);
- }
- for (Object o : (List) anchorObj) {
- shellMsg.addAnchor((String) o);
- }
- }
-
- Object nameObj = msg.get("name");
- String metricName = null;
- if (nameObj != null && nameObj instanceof String) {
- metricName = (String) nameObj;
- }
- shellMsg.setMetricName(metricName);
-
- Object paramsObj = msg.get("params");
- shellMsg.setMetricParams(paramsObj);
-
- if (command.equals("log")) {
- Object logLevelObj = msg.get("level");
- if (logLevelObj != null && logLevelObj instanceof Long) {
- long logLevel = (Long)logLevelObj;
- shellMsg.setLogLevel((int)logLevel);
- }
- }
-
- return shellMsg;
- }
-
- private Object readMessage() throws IOException, NoOutputException {
- String string = readString();
- Object msg = JSONValue.parse(string);
- if (msg != null) {
- return msg;
- } else {
- throw new IOException("unable to parse: " + string);
- }
- }
-
- private String readString() throws IOException, NoOutputException {
- StringBuilder line = new StringBuilder();
-
- while (true) {
- String subline = processOut.readLine();
- if (subline == null) {
- StringBuilder errorMessage = new StringBuilder();
- errorMessage.append("Pipe to subprocess seems to be broken!");
- if (line.length() == 0) {
- errorMessage.append(" No output read.\n");
- } else {
- errorMessage.append(" Currently read output: "
- + line.toString() + "\n");
- }
- errorMessage.append("Serializer Exception:\n");
- throw new NoOutputException(errorMessage.toString());
- }
- if (subline.equals("end")) {
- break;
- }
- if (line.length() != 0) {
- line.append("\n");
- }
- line.append(subline);
- }
- return line.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java b/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java
deleted file mode 100644
index 1ce75d3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-/**
- * A NoOutputException states that no data has been received from the connected
- * non-JVM process.
- */
-public class NoOutputException extends Exception {
- public NoOutputException() {
- super();
- }
-
- public NoOutputException(String message) {
- super(message);
- }
-
- public NoOutputException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public NoOutputException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java b/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java
deleted file mode 100644
index 9eafb1a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * ShellMsg is an object that represents the data sent to a shell component from
- * a process that implements a multi-language protocol. It is the union of all
- * data types that a component can send to Storm.
- *
- * <p>
- * ShellMsgs are objects received from the ISerializer interface, after the
- * serializer has deserialized the data from the underlying wire protocol. The
- * ShellMsg class allows for a decoupling between the serialized representation
- * of the data and the data itself.
- * </p>
- */
-public class ShellMsg {
- private String command;
- private Object id;
- private List<String> anchors;
- private String stream;
- private long task;
- private String msg;
- private List<Object> tuple;
- private boolean needTaskIds;
-
- //metrics rpc
- private String metricName;
- private Object metricParams;
-
- //logLevel
- public enum ShellLogLevel {
- TRACE, DEBUG, INFO, WARN, ERROR;
-
- public static ShellLogLevel fromInt(int i) {
- switch (i) {
- case 0: return TRACE;
- case 1: return DEBUG;
- case 2: return INFO;
- case 3: return WARN;
- case 4: return ERROR;
- default: return INFO;
- }
- }
- }
-
- private ShellLogLevel logLevel = ShellLogLevel.INFO;
-
- public String getCommand() {
- return command;
- }
-
- public void setCommand(String command) {
- this.command = command;
- }
-
- public Object getId() {
- return id;
- }
-
- public void setId(Object id) {
- this.id = id;
- }
-
- public List<String> getAnchors() {
- return anchors;
- }
-
- public void setAnchors(List<String> anchors) {
- this.anchors = anchors;
- }
-
- public void addAnchor(String anchor) {
- if (anchors == null) {
- anchors = new ArrayList<String>();
- }
- this.anchors.add(anchor);
- }
-
- public String getStream() {
- return stream;
- }
-
- public void setStream(String stream) {
- this.stream = stream;
- }
-
- public long getTask() {
- return task;
- }
-
- public void setTask(long task) {
- this.task = task;
- }
-
- public String getMsg() {
- return msg;
- }
-
- public void setMsg(String msg) {
- this.msg = msg;
- }
-
- public List<Object> getTuple() {
- return tuple;
- }
-
- public void setTuple(List<Object> tuple) {
- this.tuple = tuple;
- }
-
- public void addTuple(Object tuple) {
- if (this.tuple == null) {
- this.tuple = new ArrayList<Object>();
- }
- this.tuple.add(tuple);
- }
-
- public boolean areTaskIdsNeeded() {
- return needTaskIds;
- }
-
- public void setNeedTaskIds(boolean needTaskIds) {
- this.needTaskIds = needTaskIds;
- }
-
- public void setMetricName(String metricName) {
- this.metricName = metricName;
- }
-
- public String getMetricName() {
- return this.metricName;
- }
-
- public void setMetricParams(Object metricParams) {
- this.metricParams = metricParams;
- }
-
- public Object getMetricParams() {
- return metricParams;
- }
-
- public ShellLogLevel getLogLevel() {
- return logLevel;
- }
-
- public void setLogLevel(int logLevel) {
- this.logLevel = ShellLogLevel.fromInt(logLevel);
- }
-
- @Override
- public String toString() {
- return "ShellMsg{" +
- "command='" + command + '\'' +
- ", id=" + id +
- ", anchors=" + anchors +
- ", stream='" + stream + '\'' +
- ", task=" + task +
- ", msg='" + msg + '\'' +
- ", tuple=" + tuple +
- ", needTaskIds=" + needTaskIds +
- ", metricName='" + metricName + '\'' +
- ", metricParams=" + metricParams +
- ", logLevel=" + logLevel +
- '}';
- }
-}