You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/11/23 22:07:42 UTC

[03/37] storm git commit: PACEMAKER OPEN SOURCE!

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java b/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
index cbabcf9..63c3e4c 100644
--- a/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
+++ b/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class NotAliveException extends TException implements org.apache.thrift.TBase<NotAliveException, NotAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<NotAliveException> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NotAliveException");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/NullStruct.java b/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
index 1b8208c..98dd8b9 100644
--- a/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
+++ b/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class NullStruct implements org.apache.thrift.TBase<NullStruct, NullStruct._Fields>, java.io.Serializable, Cloneable, Comparable<NullStruct> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NullStruct");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java b/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
index d859f5a..b3f916b 100644
--- a/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
+++ b/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class RebalanceOptions implements org.apache.thrift.TBase<RebalanceOptions, RebalanceOptions._Fields>, java.io.Serializable, Cloneable, Comparable<RebalanceOptions> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RebalanceOptions");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java b/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
index ab86c6a..8647419 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class ShellComponent implements org.apache.thrift.TBase<ShellComponent, ShellComponent._Fields>, java.io.Serializable, Cloneable, Comparable<ShellComponent> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ShellComponent");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java b/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
index a8d6ec7..bc128aa 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class SpoutAggregateStats implements org.apache.thrift.TBase<SpoutAggregateStats, SpoutAggregateStats._Fields>, java.io.Serializable, Cloneable, Comparable<SpoutAggregateStats> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SpoutAggregateStats");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java b/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
index 3fc45cf..bb67050 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class SpoutSpec implements org.apache.thrift.TBase<SpoutSpec, SpoutSpec._Fields>, java.io.Serializable, Cloneable, Comparable<SpoutSpec> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SpoutSpec");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java b/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
index 478143f..d744184 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class SpoutStats implements org.apache.thrift.TBase<SpoutStats, SpoutStats._Fields>, java.io.Serializable, Cloneable, Comparable<SpoutStats> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SpoutStats");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java b/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
index 530b7ca..1e5ffde 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class StateSpoutSpec implements org.apache.thrift.TBase<StateSpoutSpec, StateSpoutSpec._Fields>, java.io.Serializable, Cloneable, Comparable<StateSpoutSpec> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StateSpoutSpec");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StormBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StormBase.java b/storm-core/src/jvm/backtype/storm/generated/StormBase.java
index f4af67a..6eed480 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StormBase.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StormBase.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._Fields>, java.io.Serializable, Cloneable, Comparable<StormBase> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StormBase");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
index 9b96fa3..eb74a18 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class StormTopology implements org.apache.thrift.TBase<StormTopology, StormTopology._Fields>, java.io.Serializable, Cloneable, Comparable<StormTopology> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StormTopology");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java b/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
index e3b0fdb..55b265a 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class StreamInfo implements org.apache.thrift.TBase<StreamInfo, StreamInfo._Fields>, java.io.Serializable, Cloneable, Comparable<StreamInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StreamInfo");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java b/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
index 358468a..1633361 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class SubmitOptions implements org.apache.thrift.TBase<SubmitOptions, SubmitOptions._Fields>, java.io.Serializable, Cloneable, Comparable<SubmitOptions> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SubmitOptions");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
index 6d68927..9bcb567 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class SupervisorInfo implements org.apache.thrift.TBase<SupervisorInfo, SupervisorInfo._Fields>, java.io.Serializable, Cloneable, Comparable<SupervisorInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SupervisorInfo");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
index 7e36d0f..022ecb4 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class SupervisorSummary implements org.apache.thrift.TBase<SupervisorSummary, SupervisorSummary._Fields>, java.io.Serializable, Cloneable, Comparable<SupervisorSummary> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SupervisorSummary");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java b/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
index 4b2bc63..e233458 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class ThriftSerializedObject implements org.apache.thrift.TBase<ThriftSerializedObject, ThriftSerializedObject._Fields>, java.io.Serializable, Cloneable, Comparable<ThriftSerializedObject> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftSerializedObject");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
index 4f78417..c81eac5 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, TopologyInfo._Fields>, java.io.Serializable, Cloneable, Comparable<TopologyInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologyInfo");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java b/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
index 180b608..ad29c7d 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInfo, TopologyPageInfo._Fields>, java.io.Serializable, Cloneable, Comparable<TopologyPageInfo> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologyPageInfo");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java b/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
index 0ff01de..ded0010 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, TopologyStats._Fields>, java.io.Serializable, Cloneable, Comparable<TopologyStats> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologyStats");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java b/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
index 055a01a..cfa5e24 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class TopologySummary implements org.apache.thrift.TBase<TopologySummary, TopologySummary._Fields>, java.io.Serializable, Cloneable, Comparable<TopologySummary> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologySummary");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java b/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
index 7cfadd7..2ab462f 100644
--- a/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
+++ b/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
 public class WorkerResources implements org.apache.thrift.TBase<WorkerResources, WorkerResources._Fields>, java.io.Serializable, Cloneable, Comparable<WorkerResources> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("WorkerResources");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 2149c0d..7ecd770 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -17,6 +17,17 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.lang.InterruptedException;
+
 import backtype.storm.Config;
 import backtype.storm.messaging.ConnectionWithStatus;
 import backtype.storm.messaging.TaskMessage;
@@ -34,17 +45,11 @@ import org.jboss.netty.util.TimerTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
+
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkState;
 
@@ -60,7 +65,7 @@ import static com.google.common.base.Preconditions.checkState;
  *     - Note: The current implementation drops any messages that are being enqueued for sending if the connection to
  *       the remote destination is currently unavailable.
  */
-public class Client extends ConnectionWithStatus implements IStatefulObject {
+public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
     private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
     private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
 
@@ -73,7 +78,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     private final ClientBootstrap bootstrap;
     private final InetSocketAddress dstAddress;
     protected final String dstAddressPrefixedName;
-
+    
     /**
      * The channel used for all write operations from this client to the remote destination.
      */
@@ -104,6 +109,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
      */
     private final AtomicLong pendingMessages = new AtomicLong(0);
 
+    /**
+     * Whether the SASL channel is ready.
+     */
+    private final AtomicBoolean saslChannelReady = new AtomicBoolean(false);
 
     /**
      * This flag is set to true if and only if a client instance is being closed.
@@ -125,6 +134,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         this.scheduler = scheduler;
         this.context = context;
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
+        saslChannelReady.set(!Utils.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
         LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
         int messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
 
@@ -134,19 +145,19 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
 
         // Initiate connection to remote destination
-        bootstrap = createClientBootstrap(factory, bufferSize);
+        bootstrap = createClientBootstrap(factory, bufferSize, stormConf);
         dstAddress = new InetSocketAddress(host, port);
         dstAddressPrefixedName = prefixedName(dstAddress);
         scheduleConnect(NO_DELAY_MS);
         batcher = new MessageBuffer(messageBatchSize);
     }
 
-    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
+    private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize, Map stormConf) {
         ClientBootstrap bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", bufferSize);
         bootstrap.setOption("keepAlive", true);
-        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf));
         return bootstrap;
     }
 
@@ -158,7 +169,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     }
 
     /**
-     * We will retry connection with exponential back-off policy
+     * Enqueue a task message to be sent to server
      */
     private void scheduleConnect(long delayMs) {
         scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
@@ -190,7 +201,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         } else if (!connectionEstablished(channelRef.get())) {
             return Status.Connecting;
         } else {
-            return Status.Ready;
+            if (saslChannelReady.get()) {
+                return Status.Ready;
+            } else {
+                return Status.Connecting; // need to wait until sasl channel is also ready
+            }
         }
     }
 
@@ -202,7 +217,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
     @Override
     public Iterator<TaskMessage> recv(int flags, int clientId) {
         throw new UnsupportedOperationException("Client connection should not receive any messages");
-    }
+        }
 
     @Override
     public void send(int taskId, byte[] payload) {
@@ -225,7 +240,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         }
 
         if (!hasMessages(msgs)) {
-            return;
+          return;
         }
 
         Channel channel = getConnectedChannel();
@@ -266,7 +281,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
             // because we know `Channel.isWritable` was false after the messages were already in the buffer.
         }
-    }
+        }
 
     private Channel getConnectedChannel() {
         Channel channel = channelRef.get();
@@ -281,6 +296,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             }
             return null;
         }
+        }
+
+    public InetSocketAddress getDstAddress() {
+        return dstAddress;
     }
 
     private boolean hasMessages(Iterator<TaskMessage> msgs) {
@@ -292,7 +311,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         // We consume the iterator by traversing and thus "emptying" it.
         int msgCount = iteratorSize(msgs);
         messagesLost.getAndAdd(msgCount);
-    }
+                    }
 
     private int iteratorSize(Iterator<TaskMessage> msgs) {
         int size = 0;
@@ -300,8 +319,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             while (msgs.hasNext()) {
                 size++;
                 msgs.next();
+                }
             }
-        }
         return size;
     }
 
@@ -335,7 +354,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             }
 
         });
-    }
+        }
 
     /**
      * Schedule a reconnect if we closed a non-null channel, and acquired the right to
@@ -375,7 +394,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         long totalPendingMsgs = pendingMessages.get();
         long startMs = System.currentTimeMillis();
         while (pendingMessages.get() != 0) {
-            try {
+        try {
                 long deltaMs = System.currentTimeMillis() - startMs;
                 if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
                     LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
@@ -386,11 +405,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
             }
             catch (InterruptedException e) {
                 break;
-            }
         }
-
     }
 
+    }
 
     private void closeChannel() {
         Channel channel = channelRef.get();
@@ -402,7 +420,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
 
     @Override
     public Object getState() {
-        LOG.info("Getting metrics for client connection to {}", dstAddressPrefixedName);
+        LOG.debug("Getting metrics for client connection to {}", dstAddressPrefixedName);
         HashMap<String, Object> ret = new HashMap<String, Object>();
         ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
         ret.put("sent", messagesSent.getAndSet(0));
@@ -416,10 +434,28 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
         return ret;
     }
 
-    public Map getStormConf() {
+    public Map getConfig() {
         return stormConf;
     }
 
+    /** ISaslClient interface **/
+    public void channelConnected(Channel channel) {
+//        setChannel(channel);
+        }
+
+    public void channelReady() {
+        saslChannelReady.set(true);
+    }
+
+    public String name() {
+        return (String)stormConf.get(Config.TOPOLOGY_NAME);
+    }
+
+    public String secretKey() {
+        return SaslUtils.getSecretKey(stormConf);
+    }
+    /** end **/
+
     private String srcAddressName() {
         String name = null;
         Channel channel = channelRef.get();
@@ -495,7 +531,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
                                     connectionAttempt);
                             if (messagesLost.get() > 0) {
                                 LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
-                            }
+    }
                         } else {
                             Throwable cause = future.getCause();
                             reschedule(cause);
@@ -510,8 +546,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
                 throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
                         connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
 
-            }
+    }
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 5d27a16..10c5059 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -21,10 +21,9 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -55,6 +54,7 @@ public class Context implements IContext {
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
 		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
+        // TODO investigate impact of having one worker
         if (maxWorkers > 0) {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory), maxWorkers);
@@ -103,12 +103,10 @@ public class Context implements IContext {
         for (IConnection conn : connections.values()) {
             conn.close();
         }
-
         connections = null;
 
         //we need to release resources associated with client channel factory
         clientChannelFactory.releaseExternalResources();
-
     }
 
     private String key(String host, int port) {

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index fb3efe6..bffd953 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -23,7 +23,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
 
-enum ControlMessage {
+public enum ControlMessage implements INettySerializable {
     CLOSE_MESSAGE((short)-100),
     EOB_MESSAGE((short)-201),
     OK_RESPONSE((short)-200),
@@ -43,14 +43,14 @@ enum ControlMessage {
      * @param encoded
      * @return
      */
-    static ControlMessage mkMessage(short encoded) {
+    public static ControlMessage mkMessage(short encoded) {
         for(ControlMessage cm: ControlMessage.values()) {
           if(encoded == cm.code) return cm;
         }
         return null;
     }
 
-    int encodeLength() {
+    public int encodeLength() {
         return 2; //short
     }
     
@@ -58,14 +58,19 @@ enum ControlMessage {
      * encode the current Control Message into a channel buffer
      * @throws Exception
      */
-    ChannelBuffer buffer() throws IOException {
+    public ChannelBuffer buffer() throws IOException {
         ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
         write(bout);
         bout.close();
         return bout.buffer();
     }
 
-    void write(ChannelBufferOutputStream bout) throws IOException {
+    public static ControlMessage read(byte[] serial) {
+        ChannelBuffer cm_buffer = ChannelBuffers.copiedBuffer(serial);
+        return mkMessage(cm_buffer.getShort(0));
+    }
+    
+    public void write(ChannelBufferOutputStream bout) throws IOException {
         bout.writeShort(code);        
     } 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java b/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java
new file mode 100644
index 0000000..945e6e9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+public interface INettySerializable {
+    public ChannelBuffer buffer() throws IOException;
+    public int encodeLength();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java
new file mode 100644
index 0000000..57dcfe8
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import backtype.storm.Config;
+
+public interface ISaslClient {
+    void channelConnected(Channel channel);
+    void channelReady();
+    String name();
+    String secretKey();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java
new file mode 100644
index 0000000..4203dcc
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface ISaslServer extends IServer {
+    String name();
+    String secretKey();
+    void authenticated(Channel c);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java
new file mode 100644
index 0000000..d046492
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface IServer {
+    void channelConnected(Channel c);
+    void received(Object message, String remote, Channel channel) throws InterruptedException;
+    void closeChannel(Channel c);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
new file mode 100644
index 0000000..9ae34fe
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(KerberosSaslClientHandler.class);
+    private ISaslClient client;
+    long start_time;
+    /** Used for client or server's token to send or receive from each other. */
+    private Map storm_conf;
+    private String jaas_section;
+
+    public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, String jaas_section) throws IOException {
+        this.client = client;
+        this.storm_conf = storm_conf;
+        this.jaas_section = jaas_section;
+        start_time = System.currentTimeMillis();
+    }
+
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx,
+            ChannelStateEvent event) {
+        // register the newly established channel
+        Channel channel = ctx.getChannel();
+        client.channelConnected(channel);
+
+        LOG.info("Connection established from {} to {}",
+                 channel.getLocalAddress(), channel.getRemoteAddress());
+
+        try {
+            KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+                    .get(channel);
+
+            if (saslNettyClient == null) {
+                LOG.debug("Creating saslNettyClient now for channel: {}",
+                          channel);
+                saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section);
+                KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
+                        saslNettyClient);
+            }
+            LOG.debug("Going to initiate Kerberos negotiations.");
+            byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
+            LOG.debug("Sending initial challenge: {}", initialChallenge);
+            channel.write(new SaslMessageToken(initialChallenge));
+        } catch (Exception e) {
+            LOG.error("Failed to authenticate with server due to error: ",
+                      e);
+        }
+        return;
+
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+            throws Exception {
+        LOG.debug("send/recv time (ms): {}",
+                (System.currentTimeMillis() - start_time));
+
+        Channel channel = ctx.getChannel();
+
+        // Generate SASL response to server using Channel-local SASL client.
+        KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+                .get(channel);
+        if (saslNettyClient == null) {
+            throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
+        }
+
+        // examine the response message from server
+        if (event.getMessage() instanceof ControlMessage) {
+            ControlMessage msg = (ControlMessage) event.getMessage();
+            if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+                LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+
+                if (!saslNettyClient.isComplete()) {
+                    String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
+                    LOG.error(message);
+                    throw new Exception(message);
+                }
+                ctx.getPipeline().remove(this);
+                this.client.channelReady();
+
+                // We call fireMessageReceived since the client is allowed to
+                // perform this request. The client's request will now proceed
+                // to the next pipeline component namely StormClientHandler.
+                Channels.fireMessageReceived(ctx, msg);
+            }
+            else {
+                LOG.warn("Unexpected control message: {}", msg);
+            }
+            return;
+        }
+        else if (event.getMessage() instanceof SaslMessageToken) {
+            SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+                .getMessage();
+            LOG.debug("Responding to server's token of length: {}",
+                      saslTokenMessage.getSaslToken().length);
+
+            // Generate SASL response (but we only actually send the response if
+            // it's non-null.
+            byte[] responseToServer = saslNettyClient
+                .saslResponse(saslTokenMessage);
+            if (responseToServer == null) {
+                // If we generate a null response, then authentication has completed
+                // (if not, warn), and return without sending a response back to the
+                // server.
+                LOG.debug("Response to server is null: authentication should now be complete.");
+                if (!saslNettyClient.isComplete()) {
+                    LOG.warn("Generated a null response, but authentication is not complete.");
+                    throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+                }
+                this.client.channelReady();
+                return;
+            } else {
+                LOG.debug("Response to server token has length: {}",
+                          responseToServer.length);
+            }
+            // Construct a message containing the SASL response and send it to the
+            // server.
+            SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+            channel.write(saslResponse);
+        }
+        else {
+            LOG.error("Unexpected message from server: {}", event.getMessage());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
new file mode 100644
index 0000000..32afab0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.security.auth.AuthUtils;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class KerberosSaslNettyClient {
+
+    private static final Logger LOG = LoggerFactory
+            .getLogger(KerberosSaslNettyClient.class);
+
+    /**
+     * Used to respond to server's counterpart, SaslServer with SASL tokens
+     * represented as byte arrays.
+     */
+    private SaslClient saslClient;
+    private Subject subject;
+    private String jaas_section;
+    
+    /**
+     * Create a KerberosSaslNettyClient for authentication with servers.
+     */
+    public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
+        LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
+                  SaslUtils.KERBEROS);
+        
+        LOG.info("Creating Kerberos Client.");
+        
+        Configuration login_conf;
+        try {
+            login_conf = AuthUtils.GetConfiguration(storm_conf);
+        }
+        catch (Throwable t) {
+            LOG.error("Failed to get login_conf: ", t);
+            throw t;
+        }
+        LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
+        
+        SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
+        
+        subject = null;
+        try {
+            LOG.debug("Setting Configuration to login_config: {}", login_conf);
+            //specify a configuration object to be used
+            Configuration.setConfiguration(login_conf); 
+            //now login
+            LOG.debug("Trying to login.");
+            Login login = new Login(jaas_section, ch);
+            subject = login.getSubject();
+            LOG.debug("Got Subject: {}", subject.toString());
+        } catch (LoginException ex) {
+            LOG.error("Client failed to login in principal:" + ex, ex);
+            throw new RuntimeException(ex);
+        }
+        
+        //check the credential of our principal
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
+            LOG.error("Failed to verify user principal.");
+            throw new RuntimeException("Fail to verify user principal with section \"" +
+                                       jaas_section +
+                                       "\" in login configuration file " +
+                                       login_conf);
+        }
+
+        String serviceName = null;
+        try {
+            serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
+        }
+        catch (IOException e) {
+            LOG.error("Failed to get service name.", e);
+            throw new RuntimeException(e);
+        }
+
+        try {
+            Principal principal = (Principal)subject.getPrincipals().toArray()[0];
+            final String fPrincipalName = principal.getName();
+            KerberosName kerbName = new KerberosName(principal.getName());
+            final String fHost = (String)storm_conf.get(Config.PACEMAKER_HOST);
+            final String fServiceName = serviceName;
+            final CallbackHandler fch = ch;
+            LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
+            saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
+                    public SaslClient run() {
+                        try {
+                            Map<String, String> props = new TreeMap<String,String>();
+                            props.put(Sasl.QOP, "auth");
+                            props.put(Sasl.SERVER_AUTH, "false");
+                            return Sasl.createSaslClient(
+                                new String[] { SaslUtils.KERBEROS },
+                                fPrincipalName,
+                                fServiceName,
+                                fHost,
+                                props, fch);
+                        }
+                        catch (Exception e) {
+                            LOG.error("Subject failed to create sasl client.", e);
+                            return null;
+                        }
+                    }
+                });
+            LOG.info("Got Client: {}", saslClient);
+            
+        } catch (PrivilegedActionException e) {
+            LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
+            throw new RuntimeException(e);
+        }
+}
+
+    public boolean isComplete() {
+        return saslClient.isComplete();
+    }
+
+    /**
+     * Respond to server's SASL token.
+     * 
+     * @param saslTokenMessage
+     *            contains server's SASL token
+     * @return client's response SASL token
+     */
+    public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+        try {
+            final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
+            byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+                    public byte[] run() {
+                        try {
+                            byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
+                                                                         .getSaslToken());
+                            return retval;
+                        } catch (SaslException e) {
+                            LOG.error("saslResponse: Failed to respond to SASL server's token:",
+                                      e);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+            return retval;
+        }
+        catch (PrivilegedActionException e) {
+            LOG.error("Failed to generate response for token: ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Implementation of javax.security.auth.callback.CallbackHandler that works
+     * with Storm topology tokens.
+     */
+    private static class SaslClientCallbackHandler implements CallbackHandler {
+
+        /**
+         * Set private members using topology token.
+         * 
+         * @param topologyToken
+         */
+        public SaslClientCallbackHandler() {
+        }
+
+        /**
+         * Implementation used to respond to SASL tokens from server.
+         * 
+         * @param callbacks
+         *            objects that indicate what credential information the
+         *            server's SaslServer requires from the client.
+         * @throws UnsupportedCallbackException
+         */
+        public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                LOG.info("Kerberos Client Callback Handler got callback: {}", callback.getClass());
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
new file mode 100644
index 0000000..1283d9b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyClientState {
+
+	public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
+		protected KerberosSaslNettyClient initialValue(Channel channel) {
+			return null;
+		}
+	};
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
new file mode 100644
index 0000000..a0003c6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.KerberosPrincipalToLocal;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class KerberosSaslNettyServer {
+
+    private static final Logger LOG = LoggerFactory
+        .getLogger(KerberosSaslNettyServer.class);
+
+    private SaslServer saslServer;
+    private Subject subject;
+    private String jaas_section;
+    private List<String> authorizedUsers;
+    
+    KerberosSaslNettyServer(Map storm_conf, String jaas_section, List<String> authorizedUsers) {
+        this.authorizedUsers = authorizedUsers;
+        LOG.debug("Getting Configuration.");
+        Configuration login_conf;
+        try {
+            login_conf = AuthUtils.GetConfiguration(storm_conf);
+        }
+        catch (Throwable t) {
+            LOG.error("Failed to get login_conf: ", t);
+            throw t;
+        }
+            
+        LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
+
+        KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(storm_conf, authorizedUsers);
+        
+        //login our principal
+        subject = null;
+        try {
+            LOG.debug("Setting Configuration to login_config: {}", login_conf);
+            //specify a configuration object to be used
+            Configuration.setConfiguration(login_conf); 
+            //now login
+            LOG.debug("Trying to login.");
+            Login login = new Login(jaas_section, ch);
+            subject = login.getSubject();
+            LOG.debug("Got Subject: {}", subject.toString());
+        } catch (LoginException ex) {
+            LOG.error("Server failed to login in principal:", ex);
+            throw new RuntimeException(ex);
+        }
+        
+        //check the credential of our principal
+        if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) { 
+            LOG.error("Failed to verifyuser principal.");
+            throw new RuntimeException("Fail to verify user principal with section \""
+                                       + jaas_section
+                                       + "\" in login configuration file "
+                                       + login_conf);
+        }
+
+        try {    
+            LOG.info("Creating Kerberos Server.");
+            final CallbackHandler fch = ch;
+            Principal p = (Principal)subject.getPrincipals().toArray()[0];
+            KerberosName kName = new KerberosName(p.getName());
+            final String fHost = kName.getHostName();
+            final String fServiceName = kName.getServiceName();
+            LOG.debug("Server with host: {}", fHost);
+            saslServer =
+                Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+                        public SaslServer run() {
+                            try {
+                                Map<String, String> props = new TreeMap<String,String>();
+                                props.put(Sasl.QOP, "auth");
+                                props.put(Sasl.SERVER_AUTH, "false");
+                                return Sasl.createSaslServer(SaslUtils.KERBEROS,
+                                                             fServiceName,
+                                                             fHost, props, fch);
+                            }
+                            catch (Exception e) {
+                                LOG.error("Subject failed to create sasl server.", e);
+                                return null;
+                            }
+                        }
+                    });
+            LOG.info("Got Server: {}", saslServer);
+                 
+        } catch (PrivilegedActionException e) {
+            LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public boolean isComplete() {
+        return saslServer.isComplete();
+    }
+
+    public String getUserName() {
+        return saslServer.getAuthorizationID();
+    }
+
+    private String getPrincipal(Subject subject) {
+        Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+        if (principals==null || principals.size()<1) {
+            LOG.info("No principal found in login subject");
+            return null;
+        }
+        return ((Principal)(principals.toArray()[0])).getName();
+    }
+
+    /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+    public static class KerberosSaslCallbackHandler implements CallbackHandler {
+
+        /** Used to authenticate the clients */
+        private Map config;
+        private List<String> authorizedUsers;
+
+        public KerberosSaslCallbackHandler(Map config, List<String> authorizedUsers) {
+            LOG.debug("KerberosSaslCallback: Creating KerberosSaslCallback handler.");
+            this.config = config;
+            this.authorizedUsers = authorizedUsers;
+        }
+
+        @Override
+        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+            for (Callback callback : callbacks) {
+                LOG.info("Kerberos Callback Handler got callback: {}", callback.getClass());
+                if(callback instanceof AuthorizeCallback) {
+                    AuthorizeCallback ac = (AuthorizeCallback)callback;
+                    if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+                        LOG.debug("{} != {}", ac.getAuthenticationID(), ac.getAuthorizationID());
+                        continue;
+                    }
+
+                    LOG.debug("Authorized Users: {}", authorizedUsers);
+                    LOG.debug("Checking authorization for: {}", ac.getAuthorizationID());
+                    for(String user : authorizedUsers) {
+                        String requester = ac.getAuthorizationID();
+
+                        KerberosPrincipal principal = new KerberosPrincipal(requester);
+                        requester = new KerberosPrincipalToLocal().toLocal(principal);
+
+                        if(requester.equals(user) ) {
+                            ac.setAuthorized(true);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Used by SaslTokenMessage::processToken() to respond to server SASL
+     * tokens.
+     * 
+     * @param token
+     *            Server's SASL token
+     * @return token to send back to the server.
+     */
+    public byte[] response(final byte[] token) {
+        try {
+            byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+                    public byte[] run(){
+                        try {
+                            LOG.debug("response: Responding to input token of length: {}",
+                                      token.length);
+                            byte[] retval = saslServer.evaluateResponse(token);
+                            return retval;
+                        } catch (SaslException e) {
+                            LOG.error("response: Failed to evaluate client token of length: {} : {}",
+                                      token.length, e);
+                            throw new RuntimeException(e);
+                        }
+                    }
+                });
+            return retval;
+        }
+        catch (PrivilegedActionException e) {
+            LOG.error("Failed to generate response for token: ", e);
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
new file mode 100644
index 0000000..064dc91
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyServerState {
+
+    public static final ChannelLocal<KerberosSaslNettyServer> getKerberosSaslNettyServer = new ChannelLocal<KerberosSaslNettyServer>() {
+            protected KerberosSaslNettyServer initialValue(Channel channel) {
+                return null;
+            }
+	};
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
new file mode 100644
index 0000000..3ed3fd7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
+
+    ISaslServer server;
+    /** Used for client or server's token to send or receive from each other. */
+    private Map storm_conf;
+    private String jaas_section;
+    private List<String> authorizedUsers;
+    
+    private static final Logger LOG = LoggerFactory
+            .getLogger(KerberosSaslServerHandler.class);
+
+    public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
+        this.server = server;
+        this.storm_conf = storm_conf;
+        this.jaas_section = jaas_section;
+        this.authorizedUsers = authorizedUsers;
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+            throws Exception {
+        Object msg = e.getMessage();
+        if (msg == null)
+            return;
+
+        Channel channel = ctx.getChannel();
+
+        
+        if (msg instanceof SaslMessageToken) {
+            // initialize server-side SASL functionality, if we haven't yet
+            // (in which case we are looking at the first SASL message from the
+            // client).
+            try {
+                LOG.debug("Got SaslMessageToken!");
+
+                KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
+                    .get(channel);
+                if (saslNettyServer == null) {
+                    LOG.debug("No saslNettyServer for {}  yet; creating now, with topology token: ", channel);
+                    try {
+                        saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
+                    } catch (RuntimeException ioe) {
+                        LOG.error("Error occurred while creating saslNettyServer on server {} for client {}", 
+                                  channel.getLocalAddress(), channel.getRemoteAddress());
+                        saslNettyServer = null;
+                    }
+                    
+                    KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
+                                                                                saslNettyServer);
+                } else {
+                    LOG.debug("Found existing saslNettyServer on server: {} for client {}",
+                              channel.getLocalAddress(), channel.getRemoteAddress());
+                }
+
+                byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
+                                                                .getSaslToken());
+                    
+                SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
+
+                if(saslTokenMessageRequest.getSaslToken() == null) {
+                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                }
+                else {   
+                    // Send response to client.
+                    channel.write(saslTokenMessageRequest);
+                }
+                    
+                if (saslNettyServer.isComplete()) {
+                    // If authentication of client is complete, we will also send a
+                    // SASL-Complete message to the client.
+                    LOG.info("SASL authentication is complete for client with username: {}",
+                             saslNettyServer.getUserName());
+                    channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+                    LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
+                    ctx.getPipeline().remove(this);
+                    server.authenticated(channel);
+                }
+                return;
+            }
+            catch (Exception ex) {
+                LOG.error("Failed to handle SaslMessageToken: ", ex);
+                throw ex;
+            }
+        } else {
+            // Client should not be sending other-than-SASL messages before
+            // SaslServerHandler has removed itself from the pipeline. Such
+            // non-SASL requests will be denied by the Authorize channel handler
+            // (the next handler upstream in the server pipeline) if SASL
+            // authentication has not completed.
+            LOG.warn("Sending upstream an unexpected non-SASL message : {}",
+                     msg);
+            Channels.fireMessageReceived(ctx, msg);
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        if(server != null) server.closeChannel(e.getChannel());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 7d8bf54..8c99e78 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -70,7 +70,7 @@ public class MessageDecoder extends FrameDecoder {
             }
             
             //case 2: SaslTokenMessageRequest
-            if(code==-500) {
+            if(code == SaslMessageToken.IDENTIFIER) {
             	// Make sure that we have received at least an integer (length) 
                 if (buf.readableBytes() < 4) {
                     //need more data
@@ -142,4 +142,4 @@ public class MessageDecoder extends FrameDecoder {
             return ret;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
index 3a91a58..2a1cdea 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -33,8 +33,9 @@ public class NettyRenameThreadFactory  implements ThreadFactory {
     final ThreadGroup group;
     final AtomicInteger index = new AtomicInteger(1);
     final String name;
+    static final NettyUncaughtExceptionHandler uncaughtExceptionHandler = new NettyUncaughtExceptionHandler();
 
-    NettyRenameThreadFactory(String name) {
+    public NettyRenameThreadFactory(String name) {
         SecurityManager s = System.getSecurityManager();
         group = (s != null)? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
@@ -43,10 +44,13 @@ public class NettyRenameThreadFactory  implements ThreadFactory {
 
     public Thread newThread(Runnable r) {
         Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
-        if (t.isDaemon())
+        if (t.isDaemon()) {
             t.setDaemon(false);
-        if (t.getPriority() != Thread.NORM_PRIORITY)
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
             t.setPriority(Thread.NORM_PRIORITY);
+        }
+        t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
         return t;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
new file mode 100644
index 0000000..3d31544
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+  private static final Logger LOG = LoggerFactory.getLogger(NettyUncaughtExceptionHandler.class);
+  @Override
+  public void uncaughtException(Thread t, Throwable e) {
+    try {
+      Utils.handleUncaughtException(e);
+    } catch (Error error) {
+      LOG.info("Received error in netty thread.. terminating server...");
+      Runtime.getRuntime().exit(1);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index d0d3ca1..2fe5c2d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.io.IOException;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -26,7 +27,10 @@ import org.slf4j.LoggerFactory;
 /**
  * Send and receive SASL tokens.
  */
-public class SaslMessageToken {
+
+public class SaslMessageToken implements INettySerializable {
+    public static final short IDENTIFIER = -500;
+
     /** Class logger */
     private static final Logger LOG = LoggerFactory
             .getLogger(SaslMessageToken.class);
@@ -69,7 +73,8 @@ public class SaslMessageToken {
         this.token = token;
     }
 
-    int encodeLength() {
+
+    public int encodeLength() {
         return 2 + 4 + token.length;
     }
 
@@ -80,15 +85,15 @@ public class SaslMessageToken {
      * 
      * @throws Exception
      */
-    ChannelBuffer buffer() throws Exception {
+    public ChannelBuffer buffer() throws IOException {
         ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
                 ChannelBuffers.directBuffer(encodeLength()));
-        short identifier = -500;
         int payload_len = 0;
         if (token != null)
             payload_len = token.length;
 
-        bout.writeShort((short) identifier);
+
+        bout.writeShort(IDENTIFIER);
         bout.writeInt((int) payload_len);
         if (payload_len > 0) {
             bout.write(token);
@@ -96,4 +101,16 @@ public class SaslMessageToken {
         bout.close();
         return bout.buffer();
     }
+    
+    public static SaslMessageToken read(byte[] serial) {
+        ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial);
+        short identifier = sm_buffer.readShort();
+        int payload_len = sm_buffer.readInt();
+        if(identifier != -500) {
+            return null;
+        }
+        byte token[] = new byte[payload_len];
+        sm_buffer.readBytes(token, 0, payload_len);
+        return new SaslMessageToken(token);
+    }
 }