You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:32:52 UTC

[03/52] [partial] storm git commit: STORM-2441 Break down 'storm-core' to extract client (worker) artifacts

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
new file mode 100644
index 0000000..ad8014c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.grouping;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
+    private Random random;
+    private ArrayList<List<Integer>> choices;
+    private AtomicInteger current;
+
+    @Override
+    public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+        random = new Random();
+        choices = new ArrayList<List<Integer>>(targetTasks.size());
+        for (Integer i: targetTasks) {
+            choices.add(Arrays.asList(i));
+        }
+        Collections.shuffle(choices, random);
+        current = new AtomicInteger(0);
+    }
+
+    @Override
+    public List<Integer> chooseTasks(int taskId, List<Object> values) {
+        int rightNow;
+        int size = choices.size();
+        while (true) {
+            rightNow = current.incrementAndGet();
+            if (rightNow < size) {
+                return choices.get(rightNow);
+            } else if (rightNow == size) {
+                current.set(0);
+                return choices.get(0);
+            }
+            //race condition with another thread, and we lost
+            // try again
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java b/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java
new file mode 100644
index 0000000..3eee8d8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.task.TopologyContext;
+import java.util.Map;
+
+public class BaseTaskHook implements ITaskHook {
+    @Override
+    public void prepare(Map conf, TopologyContext context) {
+    }
+
+    @Override
+    public void cleanup() {
+    }    
+
+    @Override
+    public void emit(EmitInfo info) {
+    }
+
+    @Override
+    public void spoutAck(SpoutAckInfo info) {
+    }
+
+    @Override
+    public void spoutFail(SpoutFailInfo info) {
+    }
+
+    @Override
+    public void boltAck(BoltAckInfo info) {
+    }
+
+    @Override
+    public void boltFail(BoltFailInfo info) {
+    }
+
+    @Override
+    public void boltExecute(BoltExecuteInfo info) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java b/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
new file mode 100644
index 0000000..e178280
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A BaseWorkerHook is a noop implementation of IWorkerHook. You
+ * may extends this class and implement any and/or all methods you
+ * need for your workers.
+ */
+public class BaseWorkerHook implements IWorkerHook, Serializable {
+    private static final long serialVersionUID = 2589466485198339529L;
+
+    /**
+     * This method is called when a worker is started
+     *
+     * @param stormConf The Storm configuration for this worker
+     * @param context This object can be used to get information about this worker's place within the topology
+     */
+    @Override
+    public void start(Map stormConf, WorkerTopologyContext context) {
+        // NOOP
+    }
+
+    /**
+     * This method is called right before a worker shuts down
+     */
+    @Override
+    public void shutdown() {
+        // NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java b/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java
new file mode 100644
index 0000000..519af0c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.task.TopologyContext;
+import java.util.Map;
+
+public interface ITaskHook {
+    void prepare(Map conf, TopologyContext context);
+    void cleanup();
+    void emit(EmitInfo info);
+    void spoutAck(SpoutAckInfo info);
+    void spoutFail(SpoutFailInfo info);
+    void boltExecute(BoltExecuteInfo info);
+    void boltAck(BoltAckInfo info);
+    void boltFail(BoltFailInfo info);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java b/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java
new file mode 100644
index 0000000..5f49fe3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An IWorkerHook represents a topology component that can be executed
+ * when a worker starts, and when a worker shuts down. It can be useful
+ * when you want to execute operations before topology processing starts,
+ * or cleanup operations before your workers shut down.
+ */
+public interface IWorkerHook extends Serializable {
+    /**
+     * This method is called when a worker is started
+     *
+     * @param stormConf The Storm configuration for this worker
+     * @param context This object can be used to get information about this worker's place within the topology
+     */
+    void start(Map stormConf, WorkerTopologyContext context);
+
+    /**
+     * This method is called right before a worker shuts down
+     */
+    void shutdown();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java b/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java
new file mode 100644
index 0000000..b7679a7
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hooks;
+
+/**
+ * This Exception is thrown when registered {@link org.apache.storm.ISubmitterHook} could not be initialized or invoked.
+ */
+public class SubmitterHookException extends RuntimeException {
+
+    public SubmitterHookException() {
+    }
+
+    public SubmitterHookException(String message) {
+        super(message);
+    }
+
+    public SubmitterHookException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public SubmitterHookException(Throwable cause) {
+        super(cause);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
new file mode 100644
index 0000000..e6f4b11
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+public class BoltAckInfo {
+    public Tuple tuple;
+    public int ackingTaskId;
+    public Long processLatencyMs; // null if it wasn't sampled
+    
+    public BoltAckInfo(Tuple tuple, int ackingTaskId, Long processLatencyMs) {
+        this.tuple = tuple;
+        this.ackingTaskId = ackingTaskId;
+        this.processLatencyMs = processLatencyMs;
+    }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.boltAck(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
new file mode 100644
index 0000000..73a7f33
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+public class BoltExecuteInfo {
+    public Tuple tuple;
+    public int executingTaskId;
+    public Long executeLatencyMs; // null if it wasn't sampled
+    
+    public BoltExecuteInfo(Tuple tuple, int executingTaskId, Long executeLatencyMs) {
+        this.tuple = tuple;
+        this.executingTaskId = executingTaskId;
+        this.executeLatencyMs = executeLatencyMs;
+    }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.boltExecute(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
new file mode 100644
index 0000000..4e1e32d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+public class BoltFailInfo {
+    public Tuple tuple;
+    public int failingTaskId;
+    public Long failLatencyMs; // null if it wasn't sampled
+    
+    public BoltFailInfo(Tuple tuple, int failingTaskId, Long failLatencyMs) {
+        this.tuple = tuple;
+        this.failingTaskId = failingTaskId;
+        this.failLatencyMs = failLatencyMs;
+    }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.boltFail(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
new file mode 100644
index 0000000..52965a1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.Collection;
+import java.util.List;
+
+public class EmitInfo {
+    public List<Object> values;
+    public String stream;
+    public int taskId;
+    public Collection<Integer> outTasks;
+    
+    public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) {
+        this.values = values;
+        this.stream = stream;
+        this.taskId = taskId;
+        this.outTasks = outTasks;
+    }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.emit(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
new file mode 100644
index 0000000..4949f0f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutAckInfo {
+    public Object messageId;
+    public int spoutTaskId;
+    public Long completeLatencyMs; // null if it wasn't sampled
+    
+    public SpoutAckInfo(Object messageId, int spoutTaskId, Long completeLatencyMs) {
+        this.messageId = messageId;
+        this.spoutTaskId = spoutTaskId;
+        this.completeLatencyMs = completeLatencyMs;
+    }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.spoutAck(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
new file mode 100644
index 0000000..5b40005
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutFailInfo {
+    public Object messageId;
+    public int spoutTaskId;
+    public Long failLatencyMs; // null if it wasn't sampled
+    
+    public SpoutFailInfo(Object messageId, int spoutTaskId, Long failLatencyMs) {
+        this.messageId = messageId;
+        this.spoutTaskId = spoutTaskId;
+        this.failLatencyMs = failLatencyMs;
+    }
+
+    public void applyOn(TopologyContext topologyContext) {
+        for (ITaskHook hook : topologyContext.getHooks()) {
+            hook.spoutFail(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java b/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java
new file mode 100644
index 0000000..ba547ac
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+public abstract class ConnectionWithStatus implements IConnection {
+
+  public static enum Status {
+
+    /**
+     * we are establishing a active connection with target host. The new data
+     * sending request can be buffered for future sending, or dropped(cases like
+     * there is no enough memory). It varies with difference IConnection
+     * implementations.
+     */
+    Connecting,
+
+    /**
+     * We have a alive connection channel, which can be used to transfer data.
+     */
+    Ready,
+
+    /**
+     * The connection channel is closed or being closed. We don't accept further
+     * data sending or receiving. All data sending request will be dropped.
+     */
+    Closed
+  }
+
+    /**
+   * whether this connection is available to transfer data
+   */
+  public abstract Status status();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
new file mode 100644
index 0000000..8f415c2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.serialization.KryoTupleDeserializer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public class DeserializingConnectionCallback implements IConnectionCallback {
+    private final WorkerState.ILocalTransferCallback _cb;
+    private final Map _conf;
+    private final GeneralTopologyContext _context;
+    private final ThreadLocal<KryoTupleDeserializer> _des =
+         new ThreadLocal<KryoTupleDeserializer>() {
+             @Override
+             protected KryoTupleDeserializer initialValue() {
+                 return new KryoTupleDeserializer(_conf, _context);
+             }
+         };
+
+    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
+        _conf = conf;
+        _context = context;
+        _cb = callback;
+    }
+
+    @Override
+    public void recv(List<TaskMessage> batch) {
+        KryoTupleDeserializer des = _des.get();
+        ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
+        for (TaskMessage message: batch) {
+            ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
+        }
+        _cb.transfer(ret);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
new file mode 100644
index 0000000..7042dc3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import org.apache.storm.grouping.Load;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+public interface IConnection {
+    /**
+     * Register a callback to be notified when data is ready to be processed.
+     * @param cb the callback to process the messages.
+     */
+    public void registerRecv(IConnectionCallback cb);
+
+    /**
+     * Send load metrics to all downstream connections.
+     * @param taskToLoad a map from the task id to the load for that task.
+     */
+    public void sendLoadMetrics(Map<Integer, Double> taskToLoad);
+    
+    /**
+     * send a message with taskId and payload
+     * @param taskId task ID
+     * @param payload
+     */
+    public void send(int taskId,  byte[] payload);
+    
+    /**
+     * send batch messages
+     * @param msgs
+     */
+
+    public void send(Iterator<TaskMessage> msgs);
+    
+    /**
+     * Get the current load for the given tasks
+     * @param tasks the tasks to look for.
+     * @return a Load for each of the tasks it knows about.
+     */
+    public Map<Integer, Load> getLoad(Collection<Integer> tasks);
+
+    /**
+     * close this connection
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java
new file mode 100644
index 0000000..3224f2e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.util.List;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public interface IConnectionCallback {
+    /**
+     * A batch of new messages have arrived to be processed
+     * @param batch the messages to be processed
+     */
+    public void recv(List<TaskMessage> batch);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
new file mode 100644
index 0000000..72812b1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.util.Map;
+
+/**
+ * This interface needs to be implemented for messaging plugin. 
+ * 
+ * Messaging plugin is specified via Storm config parameter, storm.messaging.transport.
+ * 
+ * A messaging plugin should have a default constructor and implements IContext interface.
+ * Upon construction, we will invoke IContext::prepare(storm_conf) to enable context to be configured
+ * according to storm configuration. 
+ */
+public interface IContext {
+    /**
+     * This method is invoked at the startup of messaging plugin
+     * @param storm_conf storm configuration
+     */
+    public void prepare(Map storm_conf);
+    
+    /**
+     * This method is invoked when a worker is unload a messaging plugin
+     */
+    public void term();
+
+    /**
+     * This method establishes a server side connection 
+     * @param storm_id topology ID
+     * @param port port #
+     * @return server side connection
+     */
+    public IConnection bind(String storm_id, int port);
+    
+    /**
+     * This method establish a client side connection to a remote server
+     * @param storm_id topology ID
+     * @param host remote host
+     * @param port remote port
+     * @return client side connection
+     */
+    public IConnection connect(String storm_id, String host, int port);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java b/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java
new file mode 100644
index 0000000..98aba31
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.nio.ByteBuffer;
+
+public class TaskMessage {
+    private int _task;
+    private byte[] _message;
+    
+    public TaskMessage(int task, byte[] message) {
+        _task = task;
+        _message = message;
+    }
+    
+    public int task() {
+        return _task;
+    }
+
+    public byte[] message() {
+        return _message;
+    }
+    
+    public ByteBuffer serialize() {
+        ByteBuffer bb = ByteBuffer.allocate(_message.length+2);
+        bb.putShort((short)_task);
+        bb.put(_message);
+        return bb;
+    }
+    
+    public void deserialize(ByteBuffer packet) {
+        if (packet==null) return;
+        _task = packet.getShort();
+        _message = new byte[packet.limit()-2];
+        packet.get(_message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
new file mode 100644
index 0000000..511257d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
+import org.apache.storm.Config;
+
+public class TransportFactory {
+    public static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class);
+
+    public static IContext makeContext(Map storm_conf) {
+
+        //get factory class name
+        String transport_plugin_klassName = (String)storm_conf.get(Config.STORM_MESSAGING_TRANSPORT);
+        LOG.info("Storm peer transport plugin:"+transport_plugin_klassName);
+
+        IContext transport;
+        try {
+            //create a factory class
+            Class klass = Class.forName(transport_plugin_klassName);
+            //obtain a context object
+            Object obj = klass.newInstance();
+            if (obj instanceof IContext) {
+                //case 1: plugin is a IContext class
+                transport = (IContext)obj;
+                //initialize with storm configuration
+                transport.prepare(storm_conf);
+            } else {
+                //case 2: Non-IContext plugin must have a makeContext(storm_conf) method that returns IContext object
+                Method method = klass.getMethod("makeContext", Map.class);
+                LOG.debug("object:"+obj+" method:"+method);
+                transport = (IContext) method.invoke(obj, storm_conf);
+            }
+        } catch(Exception e) {
+            throw new RuntimeException("Fail to construct messaging plugin from plugin "+transport_plugin_klassName, e);
+        } 
+        return transport;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
new file mode 100644
index 0000000..7300847
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.local;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.grouping.Load;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.messaging.IConnectionCallback;
+import org.apache.storm.messaging.IContext;
+
+public class Context implements IContext {
+    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+
+    private static class LocalServer implements IConnection {
+        volatile IConnectionCallback _cb;
+        final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
+
+        @Override
+        public void registerRecv(IConnectionCallback cb) {
+            _cb = cb;
+        }
+
+        @Override
+        public void send(int taskId,  byte[] payload) {
+            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+        }
+ 
+        @Override
+        public void send(Iterator<TaskMessage> msgs) {
+            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+        }
+
+        @Override
+        public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+            Map<Integer, Load> ret = new HashMap<>();
+            for (Integer task : tasks) {
+                Double found = _load.get(task);
+                if (found != null) {
+                    ret.put(task, new Load(true, found, 0));
+                }
+            }
+            return ret; 
+        }
+
+        @Override
+        public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+            _load.putAll(taskToLoad);
+        }
+ 
+        @Override
+        public void close() {
+            //NOOP
+        }
+    };
+
+    private static class LocalClient implements IConnection {
+        private final LocalServer _server;
+        //Messages sent before the server registered a callback
+        private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
+        private final ScheduledExecutorService _pendingFlusher;
+
+        public LocalClient(LocalServer server) {
+            _server = server;
+            _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
+            _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
+                @Override
+                public Thread newThread(Runnable runnable) {
+                    Thread thread = new Thread(runnable);
+                    thread.setName("LocalClientFlusher-" + thread.getId());
+                    thread.setDaemon(true);
+                    return thread;
+                }
+            });
+            _pendingFlusher.scheduleAtFixedRate(new Runnable(){
+                @Override
+                public void run(){
+                    try {
+                        //Ensure messages are flushed even if no more sends are performed
+                        flushPending();
+                    } catch (Throwable t) {
+                        LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
+                        throw new RuntimeException(t);
+                    }
+                }
+            }, 5, 5, TimeUnit.SECONDS);
+        }
+
+        @Override
+        public void registerRecv(IConnectionCallback cb) {
+            throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+        }
+        
+        private void flushPending(){
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
+                ArrayList<TaskMessage> ret = new ArrayList<>();
+                _pendingDueToUnregisteredServer.drainTo(ret);
+                serverCb.recv(ret);
+            }
+        }
+        
+        @Override
+        public void send(int taskId,  byte[] payload) {
+            TaskMessage message = new TaskMessage(taskId, payload);
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null) {
+                flushPending();
+                serverCb.recv(Arrays.asList(message));
+            } else {
+                _pendingDueToUnregisteredServer.add(message);
+            }
+        }
+ 
+        @Override
+        public void send(Iterator<TaskMessage> msgs) {
+            IConnectionCallback serverCb = _server._cb;
+            if (serverCb != null) {
+                flushPending();
+                ArrayList<TaskMessage> ret = new ArrayList<>();
+                while (msgs.hasNext()) {
+                    ret.add(msgs.next());
+                }
+                serverCb.recv(ret);
+            } else {
+                while(msgs.hasNext()){
+                    _pendingDueToUnregisteredServer.add(msgs.next());
+                }
+            }
+        }
+
+        @Override
+        public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+            return _server.getLoad(tasks);
+        }
+
+        @Override
+        public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+            _server.sendLoadMetrics(taskToLoad);
+        }
+ 
+        @Override
+        public void close() {
+            _pendingFlusher.shutdown();
+            try{
+                _pendingFlusher.awaitTermination(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e){
+                throw new RuntimeException("Interrupted while awaiting flusher shutdown", e);
+            }
+        }
+    };
+
+    private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
+    private static LocalServer getLocalServer(String nodeId, int port) {
+        String key = nodeId + "-" + port;
+        LocalServer ret = _registry.get(key);
+        if (ret == null) {
+            ret = new LocalServer();
+            LocalServer tmp = _registry.putIfAbsent(key, ret);
+            if (tmp != null) {
+                ret = tmp;
+            }
+        }
+        return ret;
+    }
+        
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void prepare(Map storm_conf) {
+        //NOOP
+    }
+
+    @Override
+    public IConnection bind(String storm_id, int port) {
+        return getLocalServer(storm_id, port);
+    }
+
+    @Override
+    public IConnection connect(String storm_id, String host, int port) {
+        return new LocalClient(getLocalServer(storm_id, port));
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public void term() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
new file mode 100644
index 0000000..8463af6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -0,0 +1,612 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.lang.InterruptedException;
+
+import org.apache.storm.Config;
+import org.apache.storm.grouping.Load;
+import org.apache.storm.messaging.ConnectionWithStatus;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.messaging.IConnectionCallback;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A Netty client for sending task messages to a remote destination (Netty server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at the expense of network latency.  The message
+ *   batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ *     - Note: The current implementation drops any messages that are being enqueued for sending if the connection to
+ *       the remote destination is currently unavailable.
+ */
+public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
+    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+    private static final String PREFIX = "Netty-Client-";
+    private static final long NO_DELAY_MS = 0L;
+    private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
+
+    private final Map stormConf;
+    private final StormBoundedExponentialBackoffRetry retryPolicy;
+    private final ClientBootstrap bootstrap;
+    private final InetSocketAddress dstAddress;
+    protected final String dstAddressPrefixedName;
+    private volatile Map<Integer, Double> serverLoad = null;
+
+    /**
+     * The channel used for all write operations from this client to the remote destination.
+     */
+    private final AtomicReference<Channel> channelRef = new AtomicReference<>();
+
+    /**
+     * Total number of connection attempts.
+     */
+    private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of connection attempts since the last disconnect.
+     */
+    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+    /**
+     * Number of messages successfully sent to the remote destination.
+     */
+    private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+    /**
+     * Number of messages that could not be sent to the remote destination.
+     */
+    private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+    /**
+     * Periodically checks for connected channel in order to avoid loss
+     * of messages
+     */
+    private final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
+
+    /**
+     * Number of messages buffered in memory.
+     */
+    private final AtomicLong pendingMessages = new AtomicLong(0);
+
+    /**
+     * Whether the SASL channel is ready.
+     */
+    private final AtomicBoolean saslChannelReady = new AtomicBoolean(false);
+
+    /**
+     * This flag is set to true if and only if a client instance is being closed.
+     */
+    private volatile boolean closing = false;
+
+    private final Context context;
+
+    private final HashedWheelTimer scheduler;
+
+    private final MessageBuffer batcher;
+
+    private final Object writeLock = new Object();
+
+    @SuppressWarnings("rawtypes")
+    Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
+        this.stormConf = stormConf;
+        closing = false;
+        this.scheduler = scheduler;
+        this.context = context;
+        int bufferSize = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
+        saslChannelReady.set(!ObjectReader.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
+        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
+        int messageBatchSize = ObjectReader.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+
+        int maxReconnectionAttempts = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+        int minWaitMs = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+        int maxWaitMs = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
+
+        // Initiate connection to remote destination
+        bootstrap = createClientBootstrap(factory, bufferSize, stormConf);
+        dstAddress = new InetSocketAddress(host, port);
+        dstAddressPrefixedName = prefixedName(dstAddress);
+        launchChannelAliveThread();
+        scheduleConnect(NO_DELAY_MS);
+        batcher = new MessageBuffer(messageBatchSize);
+    }
+
+    /**
+     * This thread helps us to check for channel connection periodically.
+     * This is performed just to know whether the destination address
+     * is alive or attempts to refresh connections if not alive. This
+     * solution is better than what we have now in case of a bad channel.
+     */
+    private void launchChannelAliveThread() {
+        // netty TimerTask is already defined and hence a fully
+        // qualified name
+        timer.schedule(new java.util.TimerTask() {
+            public void run() {
+                try {
+                    LOG.debug("running timer task, address {}", dstAddress);
+                    if(closing) {
+                        this.cancel();
+                        return;
+                    }
+                    getConnectedChannel();
+                } catch (Exception exp) {
+                    LOG.error("channel connection error {}", exp);
+                }
+            }
+        }, 0, CHANNEL_ALIVE_INTERVAL_MS);
+    }
+
+    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize, Map stormConf) {
+        ClientBootstrap bootstrap = new ClientBootstrap(factory);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("sendBufferSize", bufferSize);
+        bootstrap.setOption("keepAlive", true);
+        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf));
+        return bootstrap;
+    }
+
+    private String prefixedName(InetSocketAddress dstAddress) {
+        if (null != dstAddress) {
+            return PREFIX + dstAddress.toString();
+        }
+        return "";
+    }
+
+    /**
+     * Enqueue a task message to be sent to server
+     */
+    private void scheduleConnect(long delayMs) {
+        scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private boolean reconnectingAllowed() {
+        return !closing;
+    }
+
+    private boolean connectionEstablished(Channel channel) {
+        // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully
+        // established iff the channel is connected.  That is, a TCP-based channel must be in the CONNECTED state before
+        // anything can be read or written to the channel.
+        //
+        // See:
+        // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
+        // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
+        return channel != null && channel.isConnected();
+    }
+
+    /**
+     * Note:  Storm will check via this method whether a worker can be activated safely during the initial startup of a
+     * topology.  The worker will only be activated once all of the its connections are ready.
+     */
+    @Override
+    public Status status() {
+        if (closing) {
+            return Status.Closed;
+        } else if (!connectionEstablished(channelRef.get())) {
+            return Status.Connecting;
+        } else {
+            if (saslChannelReady.get()) {
+                return Status.Ready;
+            } else {
+                return Status.Connecting; // need to wait until sasl channel is also ready
+            }
+        }
+    }
+
+    /**
+     * Receiving messages is not supported by a client.
+     *
+     * @throws java.lang.UnsupportedOperationException whenever this method is being called.
+     */
+    @Override
+    public void registerRecv(IConnectionCallback cb) {
+        throw new UnsupportedOperationException("Client connection should not receive any messages");
+    }
+
+    @Override
+    public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+        throw new RuntimeException("Client connection should not send load metrics");
+    }
+
+    @Override
+    public void send(int taskId, byte[] payload) {
+        TaskMessage msg = new TaskMessage(taskId, payload);
+        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+        wrapper.add(msg);
+        send(wrapper.iterator());
+    }
+
+    /**
+     * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
+     */
+    @Override
+    public void send(Iterator<TaskMessage> msgs) {
+        if (closing) {
+            int numMessages = iteratorSize(msgs);
+            LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
+                    dstAddressPrefixedName);
+            return;
+        }
+
+        if (!hasMessages(msgs)) {
+            return;
+        }
+
+        Channel channel = getConnectedChannel();
+        if (channel == null) {
+            /*
+             * Connection is unavailable. We will drop pending messages and let at-least-once message replay kick in.
+             *
+             * Another option would be to buffer the messages in memory.  But this option has the risk of causing OOM errors,
+             * especially for topologies that disable message acking because we don't know whether the connection recovery will
+             * succeed  or not, and how long the recovery will take.
+             */
+            dropMessages(msgs);
+            return;
+        }
+
+        synchronized (writeLock) {
+            while (msgs.hasNext()) {
+                TaskMessage message = msgs.next();
+                MessageBatch full = batcher.add(message);
+                if(full != null){
+                    flushMessages(channel, full);
+                }
+            }
+        }
+
+        if(channel.isWritable()){
+            synchronized (writeLock) {
+                // Netty's internal buffer is not full and we still have message left in the buffer.
+                // We should write the unfilled MessageBatch immediately to reduce latency
+                MessageBatch batch = batcher.drain();
+                if(batch != null) {
+                    flushMessages(channel, batch);
+                }
+            }
+        } else {
+            // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger
+            // batch. This yields better throughput.
+            // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
+            // because we know `Channel.isWritable` was false after the messages were already in the buffer.
+        }
+    }
+
+    private Channel getConnectedChannel() {
+        Channel channel = channelRef.get();
+        if (connectionEstablished(channel)) {
+            return channel;
+        } else {
+            // Closing the channel and reconnecting should be done before handling the messages.
+            boolean reconnectScheduled = closeChannelAndReconnect(channel);
+            if (reconnectScheduled) {
+                // Log the connection error only once
+                LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+            }
+            return null;
+        }
+    }
+
+    public InetSocketAddress getDstAddress() {
+        return dstAddress;
+    }
+
+    private boolean hasMessages(Iterator<TaskMessage> msgs) {
+        return msgs != null && msgs.hasNext();
+    }
+
+    private void dropMessages(Iterator<TaskMessage> msgs) {
+        // We consume the iterator by traversing and thus "emptying" it.
+        int msgCount = iteratorSize(msgs);
+        messagesLost.getAndAdd(msgCount);
+    }
+
+    private int iteratorSize(Iterator<TaskMessage> msgs) {
+        int size = 0;
+        if (msgs != null) {
+            while (msgs.hasNext()) {
+                size++;
+                msgs.next();
+            }
+        }
+        return size;
+    }
+
+    /**
+     * Asynchronously writes the message batch to the channel.
+     *
+     * If the write operation fails, then we will close the channel and trigger a reconnect.
+     */
+    private void flushMessages(Channel channel, final MessageBatch batch) {
+        if (null == batch || batch.isEmpty()) {
+            return;
+        }
+
+        final int numMessages = batch.size();
+        LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
+        pendingMessages.addAndGet(numMessages);
+
+        ChannelFuture future = channel.write(batch);
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future) throws Exception {
+                pendingMessages.addAndGet(0 - numMessages);
+                if (future.isSuccess()) {
+                    LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
+                    messagesSent.getAndAdd(batch.size());
+                } else {
+                    LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+                            future.getCause());
+                    closeChannelAndReconnect(future.getChannel());
+                    messagesLost.getAndAdd(numMessages);
+                }
+            }
+
+        });
+    }
+
+    /**
+     * Schedule a reconnect if we closed a non-null channel, and acquired the right to
+     * provide a replacement by successfully setting a null to the channel field
+     * @param channel
+     * @return if the call scheduled a re-connect task
+     */
+    private boolean closeChannelAndReconnect(Channel channel) {
+        if (channel != null) {
+            channel.close();
+            if (channelRef.compareAndSet(channel, null)) {
+                scheduleConnect(NO_DELAY_MS);
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Gracefully close this client.
+     */
+    @Override
+    public void close() {
+        if (!closing) {
+            LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+            context.removeClient(dstAddress.getHostName(),dstAddress.getPort());
+            // Set closing to true to prevent any further reconnection attempts.
+            closing = true;
+            waitForPendingMessagesToBeSent();
+            closeChannel();
+        }
+    }
+
+    private void waitForPendingMessagesToBeSent() {
+        LOG.info("waiting up to {} ms to send {} pending messages to {}",
+                PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+        long totalPendingMsgs = pendingMessages.get();
+        long startMs = System.currentTimeMillis();
+        while (pendingMessages.get() != 0) {
+            try {
+                long deltaMs = System.currentTimeMillis() - startMs;
+                if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
+                    LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
+                            "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+                    break;
+                }
+                Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
+            } catch (InterruptedException e) {
+                break;
+            }
+        }
+
+    }
+
+    private void closeChannel() {
+        Channel channel = channelRef.get();
+        if (channel != null) {
+            channel.close();
+            LOG.debug("channel to {} closed", dstAddressPrefixedName);
+        }
+    }
+
+    void setLoadMetrics(Map<Integer, Double> taskToLoad) {
+        this.serverLoad = taskToLoad;
+    }
+
+    @Override
+    public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+        Map<Integer, Double> loadCache = serverLoad;
+        Map<Integer, Load> ret = new HashMap<Integer, Load>();
+        if (loadCache != null) {
+            double clientLoad = Math.min(pendingMessages.get(), 1024)/1024.0;
+            for (Integer task : tasks) {
+                Double found = loadCache.get(task);
+                if (found != null) {
+                    ret.put(task, new Load(true, found, clientLoad));
+                }
+            }
+        }
+        return ret;
+    }
+
+    @Override
+    public Object getState() {
+        LOG.debug("Getting metrics for client connection to {}", dstAddressPrefixedName);
+        HashMap<String, Object> ret = new HashMap<String, Object>();
+        ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
+        ret.put("sent", messagesSent.getAndSet(0));
+        ret.put("pending", pendingMessages.get());
+        ret.put("lostOnSend", messagesLost.getAndSet(0));
+        ret.put("dest", dstAddress.toString());
+        String src = srcAddressName();
+        if (src != null) {
+            ret.put("src", src);
+        }
+        return ret;
+    }
+
+    public Map getConfig() {
+        return stormConf;
+    }
+
+    /** ISaslClient interface **/
+    public void channelConnected(Channel channel) {
+//        setChannel(channel);
+    }
+
+    public void channelReady() {
+        saslChannelReady.set(true);
+    }
+
+    public String name() {
+        return (String)stormConf.get(Config.TOPOLOGY_NAME);
+    }
+
+    public String secretKey() {
+        return SaslUtils.getSecretKey(stormConf);
+    }
+    /** end **/
+
+    private String srcAddressName() {
+        String name = null;
+        Channel channel = channelRef.get();
+        if (channel != null) {
+            SocketAddress address = channel.getLocalAddress();
+            if (address != null) {
+                name = address.toString();
+            }
+        }
+        return name;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Netty client for connecting to %s", dstAddressPrefixedName);
+    }
+
+    /**
+     * Called by Netty thread on change in channel interest
+     * @param channel
+     */
+    public void notifyInterestChanged(Channel channel) {
+        if(channel.isWritable()){
+            synchronized (writeLock) {
+                // Channel is writable again, write if there are any messages pending
+                MessageBatch pending = batcher.drain();
+                flushMessages(channel, pending);
+            }
+        }
+    }
+
+    /**
+     * Asynchronously establishes a Netty connection to the remote address
+     * This task runs on a single thread shared among all clients, and thus
+     * should not perform operations that block.
+     */
+    private class Connect implements TimerTask {
+
+        private final InetSocketAddress address;
+
+        public Connect(InetSocketAddress address) {
+            this.address = address;
+        }
+
+        private void reschedule(Throwable t) {
+            String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
+                    dstAddressPrefixedName);
+            String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
+            LOG.error(failureMsg);
+            long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+            scheduleConnect(nextDelayMs);
+        }
+
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            if (reconnectingAllowed()) {
+                final int connectionAttempt = connectionAttempts.getAndIncrement();
+                totalConnectionAttempts.getAndIncrement();
+
+                LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+                ChannelFuture future = bootstrap.connect(address);
+                future.addListener(new ChannelFutureListener() {
+                    @Override
+                    public void operationComplete(ChannelFuture future) throws Exception {
+                        // This call returns immediately
+                        Channel newChannel = future.getChannel();
+
+                        if (future.isSuccess() && connectionEstablished(newChannel)) {
+                            boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                            checkState(setChannel);
+                            LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
+                                    connectionAttempt);
+                            if (messagesLost.get() > 0) {
+                                LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
+                            }
+                        } else {
+                            Throwable cause = future.getCause();
+                            reschedule(cause);
+                            if (newChannel != null) {
+                                newChannel.close();
+                            }
+                        }
+                    }
+                });
+            } else {
+                close();
+                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
+                        connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
new file mode 100644
index 0000000..0f7c66b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.utils.ObjectReader;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+
+public class Context implements IContext {
+    @SuppressWarnings("rawtypes")
+    private Map storm_conf;
+    private Map<String, IConnection> connections;
+    private NioClientSocketChannelFactory clientChannelFactory;
+    
+    private HashedWheelTimer clientScheduleService;
+
+    /**
+     * initialization per Storm configuration 
+     */
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map storm_conf) {
+        this.storm_conf = storm_conf;
+        connections = new HashMap<>();
+
+        //each context will have a single client channel factory
+        int maxWorkers = ObjectReader.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
+        if (maxWorkers > 0) {
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory), maxWorkers);
+        } else {
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory));
+        }
+        
+        clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service"));
+    }
+
+    /**
+     * establish a server with a binding port
+     */
+    public synchronized IConnection bind(String storm_id, int port) {
+        IConnection server = new Server(storm_conf, port);
+        connections.put(key(storm_id, port), server);
+        return server;
+    }
+
+    /**
+     * establish a connection to a remote server
+     */
+    public synchronized IConnection connect(String storm_id, String host, int port) {
+        IConnection connection = connections.get(key(host,port));
+        if(connection !=null)
+        {
+            return connection;
+        }
+        IConnection client =  new Client(storm_conf, clientChannelFactory, 
+                clientScheduleService, host, port, this);
+        connections.put(key(host, port), client);
+        return client;
+    }
+
+    synchronized void removeClient(String host, int port) {
+        if (connections != null) {
+            connections.remove(key(host, port));
+        }
+    }
+
+    /**
+     * terminate this context
+     */
+    public synchronized void term() {
+        clientScheduleService.stop();
+
+        for (IConnection conn : connections.values()) {
+            conn.close();
+        }
+
+        connections = null;
+
+        //we need to release resources associated with client channel factory
+        clientChannelFactory.releaseExternalResources();
+
+    }
+
+    private String key(String host, int port) {
+        return String.format("%s:%d", host, port);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
new file mode 100644
index 0000000..3c7aaba
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+public enum ControlMessage implements INettySerializable {
+    CLOSE_MESSAGE((short)-100),
+    EOB_MESSAGE((short)-201),
+    OK_RESPONSE((short)-200),
+    FAILURE_RESPONSE((short)-400),
+    SASL_TOKEN_MESSAGE_REQUEST((short)-202),
+    SASL_COMPLETE_REQUEST((short)-203);
+
+    private short code;
+
+    //private constructor
+    private ControlMessage(short code) {
+        this.code = code;
+    }
+
+    /**
+     * @param encoded status code
+     * @return a control message per an encoded status code
+     */
+    public static ControlMessage mkMessage(short encoded) {
+        for(ControlMessage cm: ControlMessage.values()) {
+          if(encoded == cm.code) return cm;
+        }
+        return null;
+    }
+
+    public int encodeLength() {
+        return 2; //short
+    }
+    
+    /**
+     * encode the current Control Message into a channel buffer
+     * @throws IOException
+     */
+    public ChannelBuffer buffer() throws IOException {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
+        write(bout);
+        bout.close();
+        return bout.buffer();
+    }
+
+    public static ControlMessage read(byte[] serial) {
+        ChannelBuffer cm_buffer = ChannelBuffers.copiedBuffer(serial);
+        return mkMessage(cm_buffer.getShort(0));
+    }
+    
+    public void write(ChannelBufferOutputStream bout) throws IOException {
+        bout.writeShort(code);        
+    } 
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
new file mode 100644
index 0000000..0a0236f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+public interface INettySerializable {
+    ChannelBuffer buffer() throws IOException;
+    int encodeLength();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
new file mode 100644
index 0000000..681c199
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.apache.storm.Config;
+
+public interface ISaslClient {
+    void channelConnected(Channel channel);
+    void channelReady();
+    String name();
+    String secretKey();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
new file mode 100644
index 0000000..997dbeb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface ISaslServer extends IServer {
+    String name();
+    String secretKey();
+    void authenticated(Channel c);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
new file mode 100644
index 0000000..b04d715
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface IServer {
+    void channelConnected(Channel c);
+    void received(Object message, String remote, Channel channel) throws InterruptedException;
+    void closeChannel(Channel c);
+}