You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:08 UTC

[29/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java b/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java
deleted file mode 100644
index 0bf878c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/generated/WorkerSummary.java
+++ /dev/null
@@ -1,560 +0,0 @@
-/**
- * Autogenerated by Thrift Compiler (0.7.0)
- *
- * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
- */
-package backtype.storm.generated;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.EnumMap;
-import java.util.Set;
-import java.util.HashSet;
-import java.util.EnumSet;
-import java.util.Collections;
-import java.util.BitSet;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkerSummary implements org.apache.thrift7.TBase<WorkerSummary, WorkerSummary._Fields>, java.io.Serializable, Cloneable {
-  private static final org.apache.thrift7.protocol.TStruct STRUCT_DESC = new org.apache.thrift7.protocol.TStruct("WorkerSummary");
-
-  private static final org.apache.thrift7.protocol.TField PORT_FIELD_DESC = new org.apache.thrift7.protocol.TField("port", org.apache.thrift7.protocol.TType.I32, (short)1);
-  private static final org.apache.thrift7.protocol.TField TOPOLOGY_FIELD_DESC = new org.apache.thrift7.protocol.TField("topology", org.apache.thrift7.protocol.TType.STRING, (short)2);
-  private static final org.apache.thrift7.protocol.TField TASKS_FIELD_DESC = new org.apache.thrift7.protocol.TField("tasks", org.apache.thrift7.protocol.TType.LIST, (short)3);
-
-  private int port; // required
-  private String topology; // required
-  private List<TaskSummary> tasks; // required
-
-  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
-  public enum _Fields implements org.apache.thrift7.TFieldIdEnum {
-    PORT((short)1, "port"),
-    TOPOLOGY((short)2, "topology"),
-    TASKS((short)3, "tasks");
-
-    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
-
-    static {
-      for (_Fields field : EnumSet.allOf(_Fields.class)) {
-        byName.put(field.getFieldName(), field);
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, or null if its not found.
-     */
-    public static _Fields findByThriftId(int fieldId) {
-      switch(fieldId) {
-        case 1: // PORT
-          return PORT;
-        case 2: // TOPOLOGY
-          return TOPOLOGY;
-        case 3: // TASKS
-          return TASKS;
-        default:
-          return null;
-      }
-    }
-
-    /**
-     * Find the _Fields constant that matches fieldId, throwing an exception
-     * if it is not found.
-     */
-    public static _Fields findByThriftIdOrThrow(int fieldId) {
-      _Fields fields = findByThriftId(fieldId);
-      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
-      return fields;
-    }
-
-    /**
-     * Find the _Fields constant that matches name, or null if its not found.
-     */
-    public static _Fields findByName(String name) {
-      return byName.get(name);
-    }
-
-    private final short _thriftId;
-    private final String _fieldName;
-
-    _Fields(short thriftId, String fieldName) {
-      _thriftId = thriftId;
-      _fieldName = fieldName;
-    }
-
-    public short getThriftFieldId() {
-      return _thriftId;
-    }
-
-    public String getFieldName() {
-      return _fieldName;
-    }
-  }
-
-  // isset id assignments
-  private static final int __PORT_ISSET_ID = 0;
-  private BitSet __isset_bit_vector = new BitSet(1);
-
-  public static final Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> metaDataMap;
-  static {
-    Map<_Fields, org.apache.thrift7.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift7.meta_data.FieldMetaData>(_Fields.class);
-    tmpMap.put(_Fields.PORT, new org.apache.thrift7.meta_data.FieldMetaData("port", org.apache.thrift7.TFieldRequirementType.REQUIRED, 
-        new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.I32)));
-    tmpMap.put(_Fields.TOPOLOGY, new org.apache.thrift7.meta_data.FieldMetaData("topology", org.apache.thrift7.TFieldRequirementType.REQUIRED, 
-        new org.apache.thrift7.meta_data.FieldValueMetaData(org.apache.thrift7.protocol.TType.STRING)));
-    tmpMap.put(_Fields.TASKS, new org.apache.thrift7.meta_data.FieldMetaData("tasks", org.apache.thrift7.TFieldRequirementType.REQUIRED, 
-        new org.apache.thrift7.meta_data.ListMetaData(org.apache.thrift7.protocol.TType.LIST, 
-            new org.apache.thrift7.meta_data.StructMetaData(org.apache.thrift7.protocol.TType.STRUCT, TaskSummary.class))));
-    metaDataMap = Collections.unmodifiableMap(tmpMap);
-    org.apache.thrift7.meta_data.FieldMetaData.addStructMetaDataMap(WorkerSummary.class, metaDataMap);
-  }
-
-  public WorkerSummary() {
-  }
-
-  public WorkerSummary(
-    int port,
-    String topology,
-    List<TaskSummary> tasks)
-  {
-    this();
-    this.port = port;
-    set_port_isSet(true);
-    this.topology = topology;
-    this.tasks = tasks;
-  }
-
-  /**
-   * Performs a deep copy on <i>other</i>.
-   */
-  public WorkerSummary(WorkerSummary other) {
-    __isset_bit_vector.clear();
-    __isset_bit_vector.or(other.__isset_bit_vector);
-    this.port = other.port;
-    if (other.is_set_topology()) {
-      this.topology = other.topology;
-    }
-    if (other.is_set_tasks()) {
-      List<TaskSummary> __this__tasks = new ArrayList<TaskSummary>();
-      for (TaskSummary other_element : other.tasks) {
-        __this__tasks.add(new TaskSummary(other_element));
-      }
-      this.tasks = __this__tasks;
-    }
-  }
-
-  public WorkerSummary deepCopy() {
-    return new WorkerSummary(this);
-  }
-
-  @Override
-  public void clear() {
-    set_port_isSet(false);
-    this.port = 0;
-    this.topology = null;
-    this.tasks = null;
-  }
-
-  public int get_port() {
-    return this.port;
-  }
-
-  public void set_port(int port) {
-    this.port = port;
-    set_port_isSet(true);
-  }
-
-  public void unset_port() {
-    __isset_bit_vector.clear(__PORT_ISSET_ID);
-  }
-
-  /** Returns true if field port is set (has been assigned a value) and false otherwise */
-  public boolean is_set_port() {
-    return __isset_bit_vector.get(__PORT_ISSET_ID);
-  }
-
-  public void set_port_isSet(boolean value) {
-    __isset_bit_vector.set(__PORT_ISSET_ID, value);
-  }
-
-  public String get_topology() {
-    return this.topology;
-  }
-
-  public void set_topology(String topology) {
-    this.topology = topology;
-  }
-
-  public void unset_topology() {
-    this.topology = null;
-  }
-
-  /** Returns true if field topology is set (has been assigned a value) and false otherwise */
-  public boolean is_set_topology() {
-    return this.topology != null;
-  }
-
-  public void set_topology_isSet(boolean value) {
-    if (!value) {
-      this.topology = null;
-    }
-  }
-
-  public int get_tasks_size() {
-    return (this.tasks == null) ? 0 : this.tasks.size();
-  }
-
-  public java.util.Iterator<TaskSummary> get_tasks_iterator() {
-    return (this.tasks == null) ? null : this.tasks.iterator();
-  }
-
-  public void add_to_tasks(TaskSummary elem) {
-    if (this.tasks == null) {
-      this.tasks = new ArrayList<TaskSummary>();
-    }
-    this.tasks.add(elem);
-  }
-
-  public List<TaskSummary> get_tasks() {
-    return this.tasks;
-  }
-
-  public void set_tasks(List<TaskSummary> tasks) {
-    this.tasks = tasks;
-  }
-
-  public void unset_tasks() {
-    this.tasks = null;
-  }
-
-  /** Returns true if field tasks is set (has been assigned a value) and false otherwise */
-  public boolean is_set_tasks() {
-    return this.tasks != null;
-  }
-
-  public void set_tasks_isSet(boolean value) {
-    if (!value) {
-      this.tasks = null;
-    }
-  }
-
-  public void setFieldValue(_Fields field, Object value) {
-    switch (field) {
-    case PORT:
-      if (value == null) {
-        unset_port();
-      } else {
-        set_port((Integer)value);
-      }
-      break;
-
-    case TOPOLOGY:
-      if (value == null) {
-        unset_topology();
-      } else {
-        set_topology((String)value);
-      }
-      break;
-
-    case TASKS:
-      if (value == null) {
-        unset_tasks();
-      } else {
-        set_tasks((List<TaskSummary>)value);
-      }
-      break;
-
-    }
-  }
-
-  public Object getFieldValue(_Fields field) {
-    switch (field) {
-    case PORT:
-      return Integer.valueOf(get_port());
-
-    case TOPOLOGY:
-      return get_topology();
-
-    case TASKS:
-      return get_tasks();
-
-    }
-    throw new IllegalStateException();
-  }
-
-  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
-  public boolean isSet(_Fields field) {
-    if (field == null) {
-      throw new IllegalArgumentException();
-    }
-
-    switch (field) {
-    case PORT:
-      return is_set_port();
-    case TOPOLOGY:
-      return is_set_topology();
-    case TASKS:
-      return is_set_tasks();
-    }
-    throw new IllegalStateException();
-  }
-
-  @Override
-  public boolean equals(Object that) {
-    if (that == null)
-      return false;
-    if (that instanceof WorkerSummary)
-      return this.equals((WorkerSummary)that);
-    return false;
-  }
-
-  public boolean equals(WorkerSummary that) {
-    if (that == null)
-      return false;
-
-    boolean this_present_port = true;
-    boolean that_present_port = true;
-    if (this_present_port || that_present_port) {
-      if (!(this_present_port && that_present_port))
-        return false;
-      if (this.port != that.port)
-        return false;
-    }
-
-    boolean this_present_topology = true && this.is_set_topology();
-    boolean that_present_topology = true && that.is_set_topology();
-    if (this_present_topology || that_present_topology) {
-      if (!(this_present_topology && that_present_topology))
-        return false;
-      if (!this.topology.equals(that.topology))
-        return false;
-    }
-
-    boolean this_present_tasks = true && this.is_set_tasks();
-    boolean that_present_tasks = true && that.is_set_tasks();
-    if (this_present_tasks || that_present_tasks) {
-      if (!(this_present_tasks && that_present_tasks))
-        return false;
-      if (!this.tasks.equals(that.tasks))
-        return false;
-    }
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    HashCodeBuilder builder = new HashCodeBuilder();
-
-    boolean present_port = true;
-    builder.append(present_port);
-    if (present_port)
-      builder.append(port);
-
-    boolean present_topology = true && (is_set_topology());
-    builder.append(present_topology);
-    if (present_topology)
-      builder.append(topology);
-
-    boolean present_tasks = true && (is_set_tasks());
-    builder.append(present_tasks);
-    if (present_tasks)
-      builder.append(tasks);
-
-    return builder.toHashCode();
-  }
-
-  public int compareTo(WorkerSummary other) {
-    if (!getClass().equals(other.getClass())) {
-      return getClass().getName().compareTo(other.getClass().getName());
-    }
-
-    int lastComparison = 0;
-    WorkerSummary typedOther = (WorkerSummary)other;
-
-    lastComparison = Boolean.valueOf(is_set_port()).compareTo(typedOther.is_set_port());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (is_set_port()) {
-      lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.port, typedOther.port);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = Boolean.valueOf(is_set_topology()).compareTo(typedOther.is_set_topology());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (is_set_topology()) {
-      lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.topology, typedOther.topology);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    lastComparison = Boolean.valueOf(is_set_tasks()).compareTo(typedOther.is_set_tasks());
-    if (lastComparison != 0) {
-      return lastComparison;
-    }
-    if (is_set_tasks()) {
-      lastComparison = org.apache.thrift7.TBaseHelper.compareTo(this.tasks, typedOther.tasks);
-      if (lastComparison != 0) {
-        return lastComparison;
-      }
-    }
-    return 0;
-  }
-
-  public _Fields fieldForId(int fieldId) {
-    return _Fields.findByThriftId(fieldId);
-  }
-
-  public void read(org.apache.thrift7.protocol.TProtocol iprot) throws org.apache.thrift7.TException {
-    org.apache.thrift7.protocol.TField field;
-    iprot.readStructBegin();
-    while (true)
-    {
-      field = iprot.readFieldBegin();
-      if (field.type == org.apache.thrift7.protocol.TType.STOP) { 
-        break;
-      }
-      switch (field.id) {
-        case 1: // PORT
-          if (field.type == org.apache.thrift7.protocol.TType.I32) {
-            this.port = iprot.readI32();
-            set_port_isSet(true);
-          } else { 
-            org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 2: // TOPOLOGY
-          if (field.type == org.apache.thrift7.protocol.TType.STRING) {
-            this.topology = iprot.readString();
-          } else { 
-            org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        case 3: // TASKS
-          if (field.type == org.apache.thrift7.protocol.TType.LIST) {
-            {
-              org.apache.thrift7.protocol.TList _list189 = iprot.readListBegin();
-              this.tasks = new ArrayList<TaskSummary>(_list189.size);
-              for (int _i190 = 0; _i190 < _list189.size; ++_i190)
-              {
-                TaskSummary _elem191; // required
-                _elem191 = new TaskSummary();
-                _elem191.read(iprot);
-                this.tasks.add(_elem191);
-              }
-              iprot.readListEnd();
-            }
-          } else { 
-            org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
-        default:
-          org.apache.thrift7.protocol.TProtocolUtil.skip(iprot, field.type);
-      }
-      iprot.readFieldEnd();
-    }
-    iprot.readStructEnd();
-    validate();
-  }
-
-  public void write(org.apache.thrift7.protocol.TProtocol oprot) throws org.apache.thrift7.TException {
-    validate();
-
-    oprot.writeStructBegin(STRUCT_DESC);
-    oprot.writeFieldBegin(PORT_FIELD_DESC);
-    oprot.writeI32(this.port);
-    oprot.writeFieldEnd();
-    if (this.topology != null) {
-      oprot.writeFieldBegin(TOPOLOGY_FIELD_DESC);
-      oprot.writeString(this.topology);
-      oprot.writeFieldEnd();
-    }
-    if (this.tasks != null) {
-      oprot.writeFieldBegin(TASKS_FIELD_DESC);
-      {
-        oprot.writeListBegin(new org.apache.thrift7.protocol.TList(org.apache.thrift7.protocol.TType.STRUCT, this.tasks.size()));
-        for (TaskSummary _iter192 : this.tasks)
-        {
-          _iter192.write(oprot);
-        }
-        oprot.writeListEnd();
-      }
-      oprot.writeFieldEnd();
-    }
-    oprot.writeFieldStop();
-    oprot.writeStructEnd();
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder("WorkerSummary(");
-    boolean first = true;
-
-    sb.append("port:");
-    sb.append(this.port);
-    first = false;
-    if (!first) sb.append(", ");
-    sb.append("topology:");
-    if (this.topology == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.topology);
-    }
-    first = false;
-    if (!first) sb.append(", ");
-    sb.append("tasks:");
-    if (this.tasks == null) {
-      sb.append("null");
-    } else {
-      sb.append(this.tasks);
-    }
-    first = false;
-    sb.append(")");
-    return sb.toString();
-  }
-
-  public void validate() throws org.apache.thrift7.TException {
-    // check for required fields
-    if (!is_set_port()) {
-      throw new org.apache.thrift7.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString());
-    }
-
-    if (!is_set_topology()) {
-      throw new org.apache.thrift7.protocol.TProtocolException("Required field 'topology' is unset! Struct:" + toString());
-    }
-
-    if (!is_set_tasks()) {
-      throw new org.apache.thrift7.protocol.TProtocolException("Required field 'tasks' is unset! Struct:" + toString());
-    }
-
-  }
-
-  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
-    try {
-      write(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(out)));
-    } catch (org.apache.thrift7.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
-    try {
-      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-      __isset_bit_vector = new BitSet(1);
-      read(new org.apache.thrift7.protocol.TCompactProtocol(new org.apache.thrift7.transport.TIOStreamTransport(in)));
-    } catch (org.apache.thrift7.TException te) {
-      throw new java.io.IOException(te);
-    }
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java b/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java
deleted file mode 100644
index 15e37a8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/grouping/CustomStreamGrouping.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package backtype.storm.grouping;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.WorkerTopologyContext;
-import java.io.Serializable;
-import java.util.List;
-
-public interface CustomStreamGrouping extends Serializable {
-
-	/**
-	 * Tells the stream grouping at runtime the tasks in the target bolt. This
-	 * information should be used in chooseTasks to determine the target tasks.
-	 * 
-	 * It also tells the grouping the metadata on the stream this grouping will
-	 * be used on.
-	 */
-	void prepare(WorkerTopologyContext context, GlobalStreamId stream,
-			List<Integer> targetTasks);
-
-	/**
-	 * This function implements a custom stream grouping. It takes in as input
-	 * the number of tasks in the target bolt in prepare and returns the tasks
-	 * to send the tuples to.
-	 * 
-	 * @param values
-	 *            the values to group on
-	 */
-	List<Integer> chooseTasks(int taskId, List<Object> values);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java b/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java
deleted file mode 100644
index a2aac33..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/BaseTaskHook.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.hooks;
-
-import backtype.storm.hooks.info.BoltAckInfo;
-import backtype.storm.hooks.info.BoltExecuteInfo;
-import backtype.storm.hooks.info.BoltFailInfo;
-import backtype.storm.hooks.info.EmitInfo;
-import backtype.storm.hooks.info.SpoutAckInfo;
-import backtype.storm.hooks.info.SpoutFailInfo;
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-public class BaseTaskHook implements ITaskHook {
-	@Override
-	public void prepare(Map conf, TopologyContext context) {
-	}
-
-	@Override
-	public void cleanup() {
-	}
-
-	@Override
-	public void emit(EmitInfo info) {
-	}
-
-	@Override
-	public void spoutAck(SpoutAckInfo info) {
-	}
-
-	@Override
-	public void spoutFail(SpoutFailInfo info) {
-	}
-
-	@Override
-	public void boltAck(BoltAckInfo info) {
-	}
-
-	@Override
-	public void boltFail(BoltFailInfo info) {
-	}
-
-	@Override
-	public void boltExecute(BoltExecuteInfo info) {
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java b/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java
deleted file mode 100644
index f705f12..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/ITaskHook.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.hooks;
-
-import backtype.storm.hooks.info.BoltAckInfo;
-import backtype.storm.hooks.info.BoltExecuteInfo;
-import backtype.storm.hooks.info.SpoutFailInfo;
-import backtype.storm.hooks.info.SpoutAckInfo;
-import backtype.storm.hooks.info.EmitInfo;
-import backtype.storm.hooks.info.BoltFailInfo;
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-public interface ITaskHook {
-	void prepare(Map conf, TopologyContext context);
-
-	void cleanup();
-
-	void emit(EmitInfo info);
-
-	void spoutAck(SpoutAckInfo info);
-
-	void spoutFail(SpoutFailInfo info);
-
-	void boltExecute(BoltExecuteInfo info);
-
-	void boltAck(BoltAckInfo info);
-
-	void boltFail(BoltFailInfo info);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java
deleted file mode 100644
index b0f0a9b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltAckInfo.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.hooks.info;
-
-import backtype.storm.tuple.Tuple;
-
-public class BoltAckInfo {
-	public Tuple tuple;
-	public int ackingTaskId;
-	public Long processLatencyMs; // null if it wasn't sampled
-
-	public BoltAckInfo(Tuple tuple, int ackingTaskId, Long processLatencyMs) {
-		this.tuple = tuple;
-		this.ackingTaskId = ackingTaskId;
-		this.processLatencyMs = processLatencyMs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java
deleted file mode 100644
index 31ca373..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltExecuteInfo.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.hooks.info;
-
-import backtype.storm.tuple.Tuple;
-
-public class BoltExecuteInfo {
-	public Tuple tuple;
-	public int executingTaskId;
-	public Long executeLatencyMs; // null if it wasn't sampled
-
-	public BoltExecuteInfo(Tuple tuple, int executingTaskId,
-			Long executeLatencyMs) {
-		this.tuple = tuple;
-		this.executingTaskId = executingTaskId;
-		this.executeLatencyMs = executeLatencyMs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java
deleted file mode 100644
index 3a3dfec..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/BoltFailInfo.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package backtype.storm.hooks.info;
-
-import backtype.storm.tuple.Tuple;
-
-public class BoltFailInfo {
-	public Tuple tuple;
-	public int failingTaskId;
-	public Long failLatencyMs; // null if it wasn't sampled
-
-	public BoltFailInfo(Tuple tuple, int failingTaskId, Long failLatencyMs) {
-		this.tuple = tuple;
-		this.failingTaskId = failingTaskId;
-		this.failLatencyMs = failLatencyMs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java
deleted file mode 100644
index 39b9688..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/EmitInfo.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package backtype.storm.hooks.info;
-
-import java.util.Collection;
-import java.util.List;
-
-public class EmitInfo {
-	public List<Object> values;
-	public String stream;
-	public int taskId;
-	public Collection<Integer> outTasks;
-
-	public EmitInfo(List<Object> values, String stream, int taskId,
-			Collection<Integer> outTasks) {
-		this.values = values;
-		this.stream = stream;
-		this.taskId = taskId;
-		this.outTasks = outTasks;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java
deleted file mode 100644
index f74efae..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutAckInfo.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.hooks.info;
-
-public class SpoutAckInfo {
-	public Object messageId;
-	public int spoutTaskId;
-	public Long completeLatencyMs; // null if it wasn't sampled
-
-	public SpoutAckInfo(Object messageId, int spoutTaskId,
-			Long completeLatencyMs) {
-		this.messageId = messageId;
-		this.spoutTaskId = spoutTaskId;
-		this.completeLatencyMs = completeLatencyMs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java b/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java
deleted file mode 100644
index 8052b4a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/hooks/info/SpoutFailInfo.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package backtype.storm.hooks.info;
-
-public class SpoutFailInfo {
-	public Object messageId;
-	public int spoutTaskId;
-	public Long failLatencyMs; // null if it wasn't sampled
-
-	public SpoutFailInfo(Object messageId, int spoutTaskId, Long failLatencyMs) {
-		this.messageId = messageId;
-		this.spoutTaskId = spoutTaskId;
-		this.failLatencyMs = failLatencyMs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java b/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java
deleted file mode 100644
index f61e818..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/IConnection.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.messaging;
-
-import java.util.List;
-
-import backtype.storm.utils.DisruptorQueue;
-
-public interface IConnection {
-	
-	/**
-	 * (flags != 1) synchronously 
-	 * (flags==1) asynchronously 
-	 * 
-	 * @param flags
-	 * @return
-	 */
-	public TaskMessage recv(int flags);
-	
-	/**
-	 * In the new design, receive flow is through registerQueue, 
-	 * then push message into queue 
-	 * 
-	 * @param recvQueu
-	 */
-	public void registerQueue(DisruptorQueue recvQueu);
-	public void enqueue(TaskMessage message);
-	
-	public void send(List<TaskMessage> messages);
-	public void send(TaskMessage message);
-
-	/**
-	 * close this connection
-	 */
-	public void close();
-
-	public boolean isClosed();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java b/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java
deleted file mode 100644
index 760b6e5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/IContext.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package backtype.storm.messaging;
-
-import java.util.Map;
-
-import backtype.storm.utils.DisruptorQueue;
-
-/**
- * This interface needs to be implemented for messaging plugin.
- * 
- * Messaging plugin is specified via Storm config parameter,
- * storm.messaging.transport.
- * 
- * A messaging plugin should have a default constructor and implements IContext
- * interface. Upon construction, we will invoke IContext::prepare(storm_conf) to
- * enable context to be configured according to storm configuration.
- */
-public interface IContext {
-	/**
-	 * This method is invoked at the startup of messaging plugin
-	 * 
-	 * @param storm_conf
-	 *            storm configuration
-	 */
-	public void prepare(Map storm_conf);
-
-	/**
-	 * This method is invoked when a worker is unload a messaging plugin
-	 */
-	public void term();
-
-	/**
-	 * This method establishes a server side connection
-	 * 
-	 * @param topology_id
-	 *            topology ID
-	 * @param port
-	 *            port #
-	 * @param distribute
-	 *            true -- receive other worker's data
-	 * @return server side connection
-	 */
-	public IConnection bind(String topology_id, int port);
-
-	/**
-	 * This method establish a client side connection to a remote server
-	 * 
-	 * @param topology_id
-	 *            topology ID
-	 * @param host
-	 *            remote host
-	 * @param port
-	 *            remote port
-	 * @param distribute
-	 *            true -- send other worker data
-	 * @return client side connection
-	 */
-	public IConnection connect(String topology_id, String host, int port);
-};

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java b/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java
deleted file mode 100644
index cab968f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/TaskMessage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package backtype.storm.messaging;
-
-import java.nio.ByteBuffer;
-
-public class TaskMessage {
-	private int _task;
-	private byte[] _message;
-
-	public TaskMessage(int task, byte[] message) {
-		_task = task;
-		_message = message;
-	}
-
-	public int task() {
-		return _task;
-	}
-
-	public byte[] message() {
-		return _message;
-	}
-	
-	public static boolean isEmpty(TaskMessage message) {
-		if (message == null) {
-			return true;
-		}else if (message.message() == null) {
-			return true;
-		}else if (message.message().length == 0) {
-			return true;
-		}
-		
-		return false;
-	}
-
-	@Deprecated
-	public ByteBuffer serialize() {
-		ByteBuffer bb = ByteBuffer.allocate(_message.length + 2);
-		bb.putShort((short) _task);
-		bb.put(_message);
-		return bb;
-	}
-
-	@Deprecated
-	public void deserialize(ByteBuffer packet) {
-		if (packet == null)
-			return;
-		_task = packet.getShort();
-		_message = new byte[packet.limit() - 2];
-		packet.get(_message);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java b/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java
deleted file mode 100644
index 8830496..0000000
--- a/jstorm-client/src/main/java/backtype/storm/messaging/TransportFactory.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package backtype.storm.messaging;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.Config;
-
-public class TransportFactory {
-	public static final Logger LOG = Logger.getLogger(TransportFactory.class);
-
-	public static IContext makeContext(Map storm_conf) {
-
-		// get factory class name
-		String transport_plugin_klassName = (String) storm_conf
-				.get(Config.STORM_MESSAGING_TRANSPORT);
-		LOG.info("JStorm peer transport plugin:" + transport_plugin_klassName);
-
-		IContext transport = null;
-		try {
-			// create a factory class
-			Class klass = Class.forName(transport_plugin_klassName);
-			// obtain a context object
-			// Object obj = klass.newInstance();
-			Constructor constructor = klass.getDeclaredConstructor();
-			constructor.setAccessible(true);
-			Object obj = constructor.newInstance();
-			LOG.info("TransportFactory makeContext: new klass: " + obj);
-			if (obj instanceof IContext) {
-				// case 1: plugin is a IContext class
-				transport = (IContext) obj;
-				// initialize with storm configuration
-				transport.prepare(storm_conf);
-				LOG.info("TransportFactory makeContext: start prepare... "
-						+ storm_conf);
-			} else {
-				// case 2: Non-IContext plugin must have a
-				// makeContext(storm_conf) method that returns IContext object
-				Method method = klass.getMethod("makeContext", Map.class);
-				LOG.debug("object:" + obj + " method:" + method);
-				transport = (IContext) method.invoke(obj, storm_conf);
-			}
-			LOG.info("TransportFactory makeContext done...");
-		} catch (Exception e) {
-			throw new RuntimeException(
-					"Fail to construct messaging plugin from plugin "
-							+ transport_plugin_klassName, e);
-		}
-		return transport;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java b/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
deleted file mode 100644
index 19c2235..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/LoggingMetricsConsumer.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.metric;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collection;
-import java.util.Map;
-
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.utils.Utils;
-
-/*
- * Listens for all metrics, dumps them to log
- *
- * To use, add this to your topology's configuration:
- *   conf.registerMetricsConsumer(backtype.storm.metrics.LoggingMetricsConsumer.class, 1);
- *
- * Or edit the storm.yaml config file:
- *
- *   topology.metrics.consumer.register:
- *     - class: "backtype.storm.metrics.LoggingMetricsConsumer"
- *       parallelism.hint: 1
- *
- */
-public class LoggingMetricsConsumer implements IMetricsConsumer {
-	public static final Logger LOG = LoggerFactory
-			.getLogger(LoggingMetricsConsumer.class);
-
-	@Override
-	public void prepare(Map stormConf, Object registrationArgument,
-			TopologyContext context, IErrorReporter errorReporter) {
-	}
-
-	static private String padding = "                       ";
-
-	@Override
-	public void handleDataPoints(TaskInfo taskInfo,
-			Collection<DataPoint> dataPoints) {
-		StringBuilder sb = new StringBuilder();
-		String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
-				taskInfo.timestamp, taskInfo.srcWorkerHost,
-				taskInfo.srcWorkerPort, taskInfo.srcTaskId,
-				taskInfo.srcComponentId);
-		sb.append(header);
-		for (DataPoint p : dataPoints) {
-			sb.delete(header.length(), sb.length());
-			sb.append(p.name).append(padding)
-					.delete(header.length() + 23, sb.length()).append("\t")
-					.append(p.value);
-			LOG.info(sb.toString());
-		}
-	}
-
-	@Override
-	public void cleanup() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java b/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
deleted file mode 100644
index 994cb56..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/MetricsConsumerBolt.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package backtype.storm.metric;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.IMetricsConsumer;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import java.util.Collection;
-import java.util.Map;
-
-public class MetricsConsumerBolt implements IBolt {
-	IMetricsConsumer _metricsConsumer;
-	String _consumerClassName;
-	OutputCollector _collector;
-	Object _registrationArgument;
-
-	public MetricsConsumerBolt(String consumerClassName,
-			Object registrationArgument) {
-		_consumerClassName = consumerClassName;
-		_registrationArgument = registrationArgument;
-	}
-
-	@Override
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		try {
-			_metricsConsumer = (IMetricsConsumer) Class.forName(
-					_consumerClassName).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException(
-					"Could not instantiate a class listed in config under section "
-							+ Config.TOPOLOGY_METRICS_CONSUMER_REGISTER
-							+ " with fully qualified name "
-							+ _consumerClassName, e);
-		}
-		_metricsConsumer.prepare(stormConf, _registrationArgument, context,
-				(IErrorReporter) collector);
-		_collector = collector;
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		_metricsConsumer.handleDataPoints(
-				(IMetricsConsumer.TaskInfo) input.getValue(0),
-				(Collection) input.getValue(1));
-		_collector.ack(input);
-	}
-
-	@Override
-	public void cleanup() {
-		_metricsConsumer.cleanup();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java b/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java
deleted file mode 100644
index 07bdc28..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/SystemBolt.java
+++ /dev/null
@@ -1,157 +0,0 @@
-package backtype.storm.metric;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.AssignableMetric;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import clojure.lang.AFn;
-import clojure.lang.IFn;
-import clojure.lang.RT;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.management.*;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// There is one task inside one executor for each worker of the topology.
-// TaskID is always -1, therefore you can only send-unanchored tuples to co-located SystemBolt.
-// This bolt was conceived to export worker stats via metrics api.
-public class SystemBolt implements IBolt {
-	private static Logger LOG = LoggerFactory.getLogger(SystemBolt.class);
-	private static boolean _prepareWasCalled = false;
-
-	private static class MemoryUsageMetric implements IMetric {
-		IFn _getUsage;
-
-		public MemoryUsageMetric(IFn getUsage) {
-			_getUsage = getUsage;
-		}
-
-		@Override
-		public Object getValueAndReset() {
-			MemoryUsage memUsage;
-			try {
-				memUsage = (MemoryUsage) _getUsage.invoke();
-			} catch (Exception e) {
-				LOG.error("Failed to get userage ", e);
-				throw new RuntimeException(e);
-			}
-			HashMap m = new HashMap();
-			m.put("maxBytes", memUsage.getMax());
-			m.put("committedBytes", memUsage.getCommitted());
-			m.put("initBytes", memUsage.getInit());
-			m.put("usedBytes", memUsage.getUsed());
-			m.put("virtualFreeBytes", memUsage.getMax() - memUsage.getUsed());
-			m.put("unusedBytes", memUsage.getCommitted() - memUsage.getUsed());
-			return m;
-		}
-	}
-
-	// canonically the metrics data exported is time bucketed when doing counts.
-	// convert the absolute values here into time buckets.
-	private static class GarbageCollectorMetric implements IMetric {
-		GarbageCollectorMXBean _gcBean;
-		Long _collectionCount;
-		Long _collectionTime;
-
-		public GarbageCollectorMetric(GarbageCollectorMXBean gcBean) {
-			_gcBean = gcBean;
-		}
-
-		@Override
-		public Object getValueAndReset() {
-			Long collectionCountP = _gcBean.getCollectionCount();
-			Long collectionTimeP = _gcBean.getCollectionTime();
-
-			Map ret = null;
-			if (_collectionCount != null && _collectionTime != null) {
-				ret = new HashMap();
-				ret.put("count", collectionCountP - _collectionCount);
-				ret.put("timeMs", collectionTimeP - _collectionTime);
-			}
-
-			_collectionCount = collectionCountP;
-			_collectionTime = collectionTimeP;
-			return ret;
-		}
-	}
-
-	@Override
-	public void prepare(final Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		if (_prepareWasCalled
-				&& !"local".equals(stormConf.get(Config.STORM_CLUSTER_MODE))) {
-			throw new RuntimeException(
-					"A single worker should have 1 SystemBolt instance.");
-		}
-		_prepareWasCalled = true;
-
-		int bucketSize = RT.intCast(stormConf
-				.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
-
-		final RuntimeMXBean jvmRT = ManagementFactory.getRuntimeMXBean();
-
-		context.registerMetric("uptimeSecs", new IMetric() {
-			@Override
-			public Object getValueAndReset() {
-				return jvmRT.getUptime() / 1000.0;
-			}
-		}, bucketSize);
-
-		context.registerMetric("startTimeSecs", new IMetric() {
-			@Override
-			public Object getValueAndReset() {
-				return jvmRT.getStartTime() / 1000.0;
-			}
-		}, bucketSize);
-
-		context.registerMetric("newWorkerEvent", new IMetric() {
-			boolean doEvent = true;
-
-			@Override
-			public Object getValueAndReset() {
-				if (doEvent) {
-					doEvent = false;
-					return 1;
-				} else
-					return 0;
-			}
-		}, bucketSize);
-
-		final MemoryMXBean jvmMemRT = ManagementFactory.getMemoryMXBean();
-
-		context.registerMetric("memory/heap", new MemoryUsageMetric(new AFn() {
-			public Object invoke() {
-				return jvmMemRT.getHeapMemoryUsage();
-			}
-		}), bucketSize);
-		context.registerMetric("memory/nonHeap", new MemoryUsageMetric(
-				new AFn() {
-					public Object invoke() {
-						return jvmMemRT.getNonHeapMemoryUsage();
-					}
-				}), bucketSize);
-
-		for (GarbageCollectorMXBean b : ManagementFactory
-				.getGarbageCollectorMXBeans()) {
-			context.registerMetric("GC/" + b.getName().replaceAll("\\W", ""),
-					new GarbageCollectorMetric(b), bucketSize);
-		}
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		throw new RuntimeException(
-				"Non-system tuples should never be sent to __system bolt.");
-	}
-
-	@Override
-	public void cleanup() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java
deleted file mode 100644
index ed6dc72..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/AssignableMetric.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package backtype.storm.metric.api;
-
-public class AssignableMetric implements IMetric {
-	Object _value;
-
-	public AssignableMetric(Object value) {
-		_value = value;
-	}
-
-	public void setValue(Object value) {
-		_value = value;
-	}
-
-	public Object getValueAndReset() {
-		return _value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java
deleted file mode 100644
index cf74184..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/CombinedMetric.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package backtype.storm.metric.api;
-
-public class CombinedMetric implements IMetric {
-	private final ICombiner _combiner;
-	private Object _value;
-
-	public CombinedMetric(ICombiner combiner) {
-		_combiner = combiner;
-		_value = _combiner.identity();
-	}
-
-	public void update(Object value) {
-		_value = _combiner.combine(_value, value);
-	}
-
-	public Object getValueAndReset() {
-		Object ret = _value;
-		_value = _combiner.identity();
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java
deleted file mode 100644
index 12694cd..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/CountMetric.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IMetric;
-
-public class CountMetric implements IMetric {
-	long _value = 0;
-
-	public CountMetric() {
-	}
-
-	public void incr() {
-		_value++;
-	}
-
-	public void incrBy(long incrementBy) {
-		_value += incrementBy;
-	}
-
-	public Object getValueAndReset() {
-		long ret = _value;
-		_value = 0;
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java b/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java
deleted file mode 100644
index cdc9363..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/ICombiner.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface ICombiner<T> {
-	public T identity();
-
-	public T combine(T a, T b);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java
deleted file mode 100644
index cd50757..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetric.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface IMetric {
-	public Object getValueAndReset();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
deleted file mode 100644
index 51b8d5b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IMetricsConsumer.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.task.IErrorReporter;
-import backtype.storm.task.TopologyContext;
-import java.util.Collection;
-import java.util.Map;
-
-public interface IMetricsConsumer {
-	public static class TaskInfo {
-		public TaskInfo() {
-		}
-
-		public TaskInfo(String srcWorkerHost, int srcWorkerPort,
-				String srcComponentId, int srcTaskId, long timestamp,
-				int updateIntervalSecs) {
-			this.srcWorkerHost = srcWorkerHost;
-			this.srcWorkerPort = srcWorkerPort;
-			this.srcComponentId = srcComponentId;
-			this.srcTaskId = srcTaskId;
-			this.timestamp = timestamp;
-			this.updateIntervalSecs = updateIntervalSecs;
-		}
-
-		public String srcWorkerHost;
-		public int srcWorkerPort;
-		public String srcComponentId;
-		public int srcTaskId;
-		public long timestamp;
-		public int updateIntervalSecs;
-	}
-
-	public static class DataPoint {
-		public DataPoint() {
-		}
-
-		public DataPoint(String name, Object value) {
-			this.name = name;
-			this.value = value;
-		}
-
-		@Override
-		public String toString() {
-			return "[" + name + " = " + value + "]";
-		}
-
-		public String name;
-		public Object value;
-	}
-
-	void prepare(Map stormConf, Object registrationArgument,
-			TopologyContext context, IErrorReporter errorReporter);
-
-	void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
-
-	void cleanup();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java
deleted file mode 100644
index fe221ae..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IReducer.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface IReducer<T> {
-	T init();
-
-	T reduce(T accumulator, Object input);
-
-	Object extractResult(T accumulator);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java b/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java
deleted file mode 100644
index ab37b2c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/IStatefulObject.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.metric.api;
-
-public interface IStatefulObject {
-	Object getState();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java b/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java
deleted file mode 100644
index 86f4593..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/MeanReducer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IReducer;
-
-class MeanReducerState {
-	public int count = 0;
-	public double sum = 0.0;
-}
-
-public class MeanReducer implements IReducer<MeanReducerState> {
-	public MeanReducerState init() {
-		return new MeanReducerState();
-	}
-
-	public MeanReducerState reduce(MeanReducerState acc, Object input) {
-		acc.count++;
-		if (input instanceof Double) {
-			acc.sum += (Double) input;
-		} else if (input instanceof Long) {
-			acc.sum += ((Long) input).doubleValue();
-		} else if (input instanceof Integer) {
-			acc.sum += ((Integer) input).doubleValue();
-		} else {
-			throw new RuntimeException(
-					"MeanReducer::reduce called with unsupported input type `"
-							+ input.getClass()
-							+ "`. Supported types are Double, Long, Integer.");
-		}
-		return acc;
-	}
-
-	public Object extractResult(MeanReducerState acc) {
-		if (acc.count > 0) {
-			return new Double(acc.sum / (double) acc.count);
-		} else {
-			return null;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
deleted file mode 100644
index f550eeb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiCountMetric.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IMetric;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MultiCountMetric implements IMetric {
-	Map<String, CountMetric> _value = new HashMap();
-
-	public MultiCountMetric() {
-	}
-
-	public CountMetric scope(String key) {
-		CountMetric val = _value.get(key);
-		if (val == null) {
-			_value.put(key, val = new CountMetric());
-		}
-		return val;
-	}
-
-	public Object getValueAndReset() {
-		Map ret = new HashMap();
-		for (Map.Entry<String, CountMetric> e : _value.entrySet()) {
-			ret.put(e.getKey(), e.getValue().getValueAndReset());
-		}
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
deleted file mode 100644
index 5020fd8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/MultiReducedMetric.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package backtype.storm.metric.api;
-
-import backtype.storm.metric.api.IMetric;
-import java.util.HashMap;
-import java.util.Map;
-
-public class MultiReducedMetric implements IMetric {
-	Map<String, ReducedMetric> _value = new HashMap();
-	IReducer _reducer;
-
-	public MultiReducedMetric(IReducer reducer) {
-		_reducer = reducer;
-	}
-
-	public ReducedMetric scope(String key) {
-		ReducedMetric val = _value.get(key);
-		if (val == null) {
-			_value.put(key, val = new ReducedMetric(_reducer));
-		}
-		return val;
-	}
-
-	public Object getValueAndReset() {
-		Map ret = new HashMap();
-		for (Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
-			Object val = e.getValue().getValueAndReset();
-			if (val != null) {
-				ret.put(e.getKey(), val);
-			}
-		}
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java
deleted file mode 100644
index b2a7bf8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/ReducedMetric.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package backtype.storm.metric.api;
-
-public class ReducedMetric implements IMetric {
-	private final IReducer _reducer;
-	private Object _accumulator;
-
-	public ReducedMetric(IReducer reducer) {
-		_reducer = reducer;
-		_accumulator = _reducer.init();
-	}
-
-	public void update(Object value) {
-		_accumulator = _reducer.reduce(_accumulator, value);
-	}
-
-	public Object getValueAndReset() {
-		Object ret = _reducer.extractResult(_accumulator);
-		_accumulator = _reducer.init();
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java
deleted file mode 100644
index 48170ff..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/StateMetric.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.metric.api;
-
-public class StateMetric implements IMetric {
-	private IStatefulObject _obj;
-
-	public StateMetric(IStatefulObject obj) {
-		_obj = obj;
-	}
-
-	@Override
-	public Object getValueAndReset() {
-		return _obj.getState();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java
deleted file mode 100644
index 20387ed..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/AssignableShellMetric.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.AssignableMetric;
-
-public class AssignableShellMetric extends AssignableMetric implements IShellMetric {
-    public AssignableShellMetric(Object value) {
-        super(value);
-    }
-
-    public void updateMetricFromRPC(Object value) {
-        setValue(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java
deleted file mode 100644
index 231c571..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CombinedShellMetric.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.CombinedMetric;
-import backtype.storm.metric.api.ICombiner;
-
-public class CombinedShellMetric extends CombinedMetric implements IShellMetric {
-    public CombinedShellMetric(ICombiner combiner) {
-        super(combiner);
-    }
-
-    public void updateMetricFromRPC(Object value) {
-        update(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
deleted file mode 100644
index def74c2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/CountShellMetric.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.CountMetric;
-
-public class CountShellMetric extends CountMetric implements IShellMetric {
-    /***
-     * @param
-     *  params should be null or long
-     *  if value is null, it will call incr()
-     *  if value is long, it will call incrBy((long)params)
-     * */
-    public void updateMetricFromRPC(Object value) {
-        if (value == null) {
-            incr();
-        } else if (value instanceof Long) {
-            incrBy((Long)value);
-        } else {
-            throw new RuntimeException("CountShellMetric updateMetricFromRPC params should be null or Long");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
deleted file mode 100644
index d53baea..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/IShellMetric.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.IMetric;
-
-public interface IShellMetric extends IMetric {
-    /***
-     * @function
-     *     This interface is used by ShellBolt and ShellSpout through RPC call to update Metric 
-     * @param
-     *     value used to update metric, its's meaning change according implementation
-     *     Object can be any json support types: String, Long, Double, Boolean, Null, List, Map
-     * */
-    public void updateMetricFromRPC(Object value);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java b/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java
deleted file mode 100644
index 097ed51..0000000
--- a/jstorm-client/src/main/java/backtype/storm/metric/api/rpc/ReducedShellMetric.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.metric.api.rpc;
-
-import backtype.storm.metric.api.IReducer;
-import backtype.storm.metric.api.ReducedMetric;
-
-public class ReducedShellMetric extends ReducedMetric implements IShellMetric {
-
-    public ReducedShellMetric(IReducer reducer) {
-        super(reducer);
-    }
-
-    public void updateMetricFromRPC(Object value) {
-        update(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java b/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java
deleted file mode 100644
index 446bdc4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/BoltMsg.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.util.List;
-
-/**
- * BoltMsg is an object that represents the data sent from a shell component to
- * a bolt process that implements a multi-language protocol. It is the union of
- * all data types that a bolt can receive from Storm.
- *
- * <p>
- * BoltMsgs are objects sent to the ISerializer interface, for serialization
- * according to the wire protocol implemented by the serializer. The BoltMsg
- * class allows for a decoupling between the serialized representation of the
- * data and the data itself.
- * </p>
- */
-public class BoltMsg {
-    private String id;
-    private String comp;
-    private String stream;
-    private long task;
-    private List<Object> tuple;
-
-    public String getId() {
-        return id;
-    }
-
-    public void setId(String id) {
-        this.id = id;
-    }
-
-    public String getComp() {
-        return comp;
-    }
-
-    public void setComp(String comp) {
-        this.comp = comp;
-    }
-
-    public String getStream() {
-        return stream;
-    }
-
-    public void setStream(String stream) {
-        this.stream = stream;
-    }
-
-    public long getTask() {
-        return task;
-    }
-
-    public void setTask(long task) {
-        this.task = task;
-    }
-
-    public List<Object> getTuple() {
-        return tuple;
-    }
-
-    public void setTuple(List<Object> tuple) {
-        this.tuple = tuple;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java b/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java
deleted file mode 100644
index c9c7ad4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/ISerializer.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import backtype.storm.task.TopologyContext;
-
-/**
- * The ISerializer interface describes the methods that an object should
- * implement to provide serialization and de-serialization capabilities to
- * non-JVM language components.
- */
-public interface ISerializer extends Serializable {
-
-    /**
-     * This method sets the input and output streams of the serializer
-     *
-     * @param processIn output stream to non-JVM component
-     * @param processOut input stream from non-JVM component
-     */
-    void initialize(OutputStream processIn, InputStream processOut);
-
-    /**
-     * This method transmits the Storm config to the non-JVM process and
-     * receives its pid.
-     *
-     * @param conf storm configuration
-     * @param context topology context
-     * @return process pid
-     */
-    Number connect(Map conf, TopologyContext context) throws IOException,
-            NoOutputException;
-
-    /**
-     * This method receives a shell message from the non-JVM process
-     *
-     * @return shell message
-     */
-    ShellMsg readShellMsg() throws IOException, NoOutputException;
-
-    /**
-     * This method sends a bolt message to a non-JVM bolt process
-     *
-     * @param msg bolt message
-     */
-    void writeBoltMsg(BoltMsg msg) throws IOException;
-
-    /**
-     * This method sends a spout message to a non-JVM spout process
-     *
-     * @param msg spout message
-     */
-    void writeSpoutMsg(SpoutMsg msg) throws IOException;
-
-    /**
-     * This method sends a list of task IDs to a non-JVM bolt process
-     *
-     * @param taskIds list of task IDs
-     */
-    void writeTaskIds(List<Integer> taskIds) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java b/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java
deleted file mode 100644
index 9fca312..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/JsonSerializer.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.io.BufferedReader;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-import org.json.simple.JSONObject;
-import org.json.simple.JSONValue;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-
-/**
- * JsonSerializer implements the JSON multilang protocol.
- */
-public class JsonSerializer implements ISerializer {
-    private DataOutputStream processIn;
-    private BufferedReader processOut;
-
-    public void initialize(OutputStream processIn, InputStream processOut) {
-        this.processIn = new DataOutputStream(processIn);
-        try {
-            this.processOut = new BufferedReader(new InputStreamReader(processOut, "UTF-8"));
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Number connect(Map conf, TopologyContext context)
-            throws IOException, NoOutputException {
-        JSONObject setupInfo = new JSONObject();
-        setupInfo.put("pidDir", context.getPIDDir());
-        setupInfo.put("conf", conf);
-        setupInfo.put("context", context);
-        writeMessage(setupInfo);
-
-        Number pid = (Number) ((JSONObject) readMessage()).get("pid");
-        return pid;
-    }
-
-    public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
-        JSONObject obj = new JSONObject();
-        obj.put("id", boltMsg.getId());
-        obj.put("comp", boltMsg.getComp());
-        obj.put("stream", boltMsg.getStream());
-        obj.put("task", boltMsg.getTask());
-        obj.put("tuple", boltMsg.getTuple());
-        writeMessage(obj);
-    }
-
-    public void writeSpoutMsg(SpoutMsg msg) throws IOException {
-        JSONObject obj = new JSONObject();
-        obj.put("command", msg.getCommand());
-        obj.put("id", msg.getId());
-        writeMessage(obj);
-    }
-
-    public void writeTaskIds(List<Integer> taskIds) throws IOException {
-        writeMessage(taskIds);
-    }
-
-    private void writeMessage(Object msg) throws IOException {
-        writeString(JSONValue.toJSONString(msg));
-    }
-
-    private void writeString(String str) throws IOException {
-        byte[] strBytes = str.getBytes("UTF-8");
-        processIn.write(strBytes, 0, strBytes.length);
-        processIn.writeBytes("\nend\n");
-        processIn.flush();
-    }
-
-    public ShellMsg readShellMsg() throws IOException, NoOutputException {
-        JSONObject msg = (JSONObject) readMessage();
-        ShellMsg shellMsg = new ShellMsg();
-
-        String command = (String) msg.get("command");
-        shellMsg.setCommand(command);
-
-        Object id = msg.get("id");
-        shellMsg.setId(id);
-
-        String log = (String) msg.get("msg");
-        shellMsg.setMsg(log);
-
-        String stream = (String) msg.get("stream");
-        if (stream == null)
-            stream = Utils.DEFAULT_STREAM_ID;
-        shellMsg.setStream(stream);
-
-        Object taskObj = msg.get("task");
-        if (taskObj != null) {
-            shellMsg.setTask((Long) taskObj);
-        } else {
-            shellMsg.setTask(0);
-        }
-
-        Object need_task_ids = msg.get("need_task_ids");
-        if (need_task_ids == null || ((Boolean) need_task_ids).booleanValue()) {
-            shellMsg.setNeedTaskIds(true);
-        } else {
-            shellMsg.setNeedTaskIds(false);
-        }
-
-        shellMsg.setTuple((List) msg.get("tuple"));
-
-        //List<Tuple> anchors = new ArrayList<Tuple>();
-        Object anchorObj = msg.get("anchors");
-        if (anchorObj != null) {
-            if (anchorObj instanceof String) {
-                anchorObj = Arrays.asList(anchorObj);
-            }
-            for (Object o : (List) anchorObj) {
-                shellMsg.addAnchor((String) o);
-            }
-        }
-       
-        Object nameObj = msg.get("name"); 
-        String metricName = null;
-        if (nameObj != null && nameObj instanceof String) {
-            metricName = (String) nameObj;
-        }
-        shellMsg.setMetricName(metricName);
-        
-        Object paramsObj = msg.get("params");
-        shellMsg.setMetricParams(paramsObj);
-
-        if (command.equals("log")) {
-            Object logLevelObj = msg.get("level");
-            if (logLevelObj != null && logLevelObj instanceof Long) {
-                long logLevel = (Long)logLevelObj;
-                shellMsg.setLogLevel((int)logLevel);
-            }
-        }
-
-        return shellMsg;
-    }
-
-    private Object readMessage() throws IOException, NoOutputException {
-        String string = readString();
-        Object msg = JSONValue.parse(string);
-        if (msg != null) {
-            return msg;
-        } else {
-            throw new IOException("unable to parse: " + string);
-        }
-    }
-
-    private String readString() throws IOException, NoOutputException {
-        StringBuilder line = new StringBuilder();
-
-        while (true) {
-            String subline = processOut.readLine();
-            if (subline == null) {
-                StringBuilder errorMessage = new StringBuilder();
-                errorMessage.append("Pipe to subprocess seems to be broken!");
-                if (line.length() == 0) {
-                    errorMessage.append(" No output read.\n");
-                } else {
-                    errorMessage.append(" Currently read output: "
-                            + line.toString() + "\n");
-                }
-                errorMessage.append("Serializer Exception:\n");
-                throw new NoOutputException(errorMessage.toString());
-            }
-            if (subline.equals("end")) {
-                break;
-            }
-            if (line.length() != 0) {
-                line.append("\n");
-            }
-            line.append(subline);
-        }
-        return line.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java b/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java
deleted file mode 100644
index 1ce75d3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/NoOutputException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-/**
- * A NoOutputException states that no data has been received from the connected
- * non-JVM process.
- */
-public class NoOutputException extends Exception {
-    public NoOutputException() {
-        super();
-    }
-
-    public NoOutputException(String message) {
-        super(message);
-    }
-
-    public NoOutputException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public NoOutputException(Throwable cause) {
-        super(cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java b/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java
deleted file mode 100644
index 9eafb1a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/multilang/ShellMsg.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.multilang;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * ShellMsg is an object that represents the data sent to a shell component from
- * a process that implements a multi-language protocol. It is the union of all
- * data types that a component can send to Storm.
- *
- * <p>
- * ShellMsgs are objects received from the ISerializer interface, after the
- * serializer has deserialized the data from the underlying wire protocol. The
- * ShellMsg class allows for a decoupling between the serialized representation
- * of the data and the data itself.
- * </p>
- */
-public class ShellMsg {
-    private String command;
-    private Object id;
-    private List<String> anchors;
-    private String stream;
-    private long task;
-    private String msg;
-    private List<Object> tuple;
-    private boolean needTaskIds;
-
-    //metrics rpc 
-    private String metricName;
-    private Object metricParams;
-
-    //logLevel
-    public enum ShellLogLevel {
-        TRACE, DEBUG, INFO, WARN, ERROR;
-
-        public static ShellLogLevel fromInt(int i) {
-            switch (i) {
-                case 0: return TRACE;
-                case 1: return DEBUG;
-                case 2: return INFO;
-                case 3: return WARN;
-                case 4: return ERROR;
-                default: return INFO;
-            }
-        }
-    }
-
-    private ShellLogLevel logLevel = ShellLogLevel.INFO;
-
-    public String getCommand() {
-        return command;
-    }
-
-    public void setCommand(String command) {
-        this.command = command;
-    }
-
-    public Object getId() {
-        return id;
-    }
-
-    public void setId(Object id) {
-        this.id = id;
-    }
-
-    public List<String> getAnchors() {
-        return anchors;
-    }
-
-    public void setAnchors(List<String> anchors) {
-        this.anchors = anchors;
-    }
-
-    public void addAnchor(String anchor) {
-        if (anchors == null) {
-            anchors = new ArrayList<String>();
-        }
-        this.anchors.add(anchor);
-    }
-
-    public String getStream() {
-        return stream;
-    }
-
-    public void setStream(String stream) {
-        this.stream = stream;
-    }
-
-    public long getTask() {
-        return task;
-    }
-
-    public void setTask(long task) {
-        this.task = task;
-    }
-
-    public String getMsg() {
-        return msg;
-    }
-
-    public void setMsg(String msg) {
-        this.msg = msg;
-    }
-
-    public List<Object> getTuple() {
-        return tuple;
-    }
-
-    public void setTuple(List<Object> tuple) {
-        this.tuple = tuple;
-    }
-
-    public void addTuple(Object tuple) {
-        if (this.tuple == null) {
-            this.tuple = new ArrayList<Object>();
-        }
-        this.tuple.add(tuple);
-    }
-
-    public boolean areTaskIdsNeeded() {
-        return needTaskIds;
-    }
-
-    public void setNeedTaskIds(boolean needTaskIds) {
-        this.needTaskIds = needTaskIds;
-    }
-
-    public void setMetricName(String metricName) {
-        this.metricName = metricName;
-    }
-
-    public String getMetricName() {
-        return this.metricName;
-    }
-
-    public void setMetricParams(Object metricParams) {
-        this.metricParams = metricParams;
-    }
-
-    public Object getMetricParams() {
-        return metricParams;
-    }
-
-    public ShellLogLevel getLogLevel() {
-        return logLevel;
-    }
-
-    public void setLogLevel(int logLevel) {
-        this.logLevel = ShellLogLevel.fromInt(logLevel);
-    }
-
-    @Override
-    public String toString() {
-        return "ShellMsg{" +
-                "command='" + command + '\'' +
-                ", id=" + id +
-                ", anchors=" + anchors +
-                ", stream='" + stream + '\'' +
-                ", task=" + task +
-                ", msg='" + msg + '\'' +
-                ", tuple=" + tuple +
-                ", needTaskIds=" + needTaskIds +
-                ", metricName='" + metricName + '\'' +
-                ", metricParams=" + metricParams +
-                ", logLevel=" + logLevel +
-                '}';
-    }
-}