You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2015/11/23 22:07:42 UTC
[03/37] storm git commit: PACEMAKER OPEN SOURCE!
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java b/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
index cbabcf9..63c3e4c 100644
--- a/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
+++ b/storm-core/src/jvm/backtype/storm/generated/NotAliveException.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class NotAliveException extends TException implements org.apache.thrift.TBase<NotAliveException, NotAliveException._Fields>, java.io.Serializable, Cloneable, Comparable<NotAliveException> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NotAliveException");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/NullStruct.java b/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
index 1b8208c..98dd8b9 100644
--- a/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
+++ b/storm-core/src/jvm/backtype/storm/generated/NullStruct.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class NullStruct implements org.apache.thrift.TBase<NullStruct, NullStruct._Fields>, java.io.Serializable, Cloneable, Comparable<NullStruct> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NullStruct");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java b/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
index d859f5a..b3f916b 100644
--- a/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
+++ b/storm-core/src/jvm/backtype/storm/generated/RebalanceOptions.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class RebalanceOptions implements org.apache.thrift.TBase<RebalanceOptions, RebalanceOptions._Fields>, java.io.Serializable, Cloneable, Comparable<RebalanceOptions> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RebalanceOptions");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java b/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
index ab86c6a..8647419 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ShellComponent.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class ShellComponent implements org.apache.thrift.TBase<ShellComponent, ShellComponent._Fields>, java.io.Serializable, Cloneable, Comparable<ShellComponent> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ShellComponent");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java b/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
index a8d6ec7..bc128aa 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SpoutAggregateStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class SpoutAggregateStats implements org.apache.thrift.TBase<SpoutAggregateStats, SpoutAggregateStats._Fields>, java.io.Serializable, Cloneable, Comparable<SpoutAggregateStats> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SpoutAggregateStats");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java b/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
index 3fc45cf..bb67050 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SpoutSpec.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class SpoutSpec implements org.apache.thrift.TBase<SpoutSpec, SpoutSpec._Fields>, java.io.Serializable, Cloneable, Comparable<SpoutSpec> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SpoutSpec");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java b/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
index 478143f..d744184 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SpoutStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class SpoutStats implements org.apache.thrift.TBase<SpoutStats, SpoutStats._Fields>, java.io.Serializable, Cloneable, Comparable<SpoutStats> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SpoutStats");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java b/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
index 530b7ca..1e5ffde 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StateSpoutSpec.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class StateSpoutSpec implements org.apache.thrift.TBase<StateSpoutSpec, StateSpoutSpec._Fields>, java.io.Serializable, Cloneable, Comparable<StateSpoutSpec> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StateSpoutSpec");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StormBase.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StormBase.java b/storm-core/src/jvm/backtype/storm/generated/StormBase.java
index f4af67a..6eed480 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StormBase.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StormBase.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class StormBase implements org.apache.thrift.TBase<StormBase, StormBase._Fields>, java.io.Serializable, Cloneable, Comparable<StormBase> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StormBase");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
index 9b96fa3..eb74a18 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StormTopology.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class StormTopology implements org.apache.thrift.TBase<StormTopology, StormTopology._Fields>, java.io.Serializable, Cloneable, Comparable<StormTopology> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StormTopology");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java b/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
index e3b0fdb..55b265a 100644
--- a/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/StreamInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class StreamInfo implements org.apache.thrift.TBase<StreamInfo, StreamInfo._Fields>, java.io.Serializable, Cloneable, Comparable<StreamInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("StreamInfo");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java b/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
index 358468a..1633361 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SubmitOptions.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class SubmitOptions implements org.apache.thrift.TBase<SubmitOptions, SubmitOptions._Fields>, java.io.Serializable, Cloneable, Comparable<SubmitOptions> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SubmitOptions");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
index 6d68927..9bcb567 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class SupervisorInfo implements org.apache.thrift.TBase<SupervisorInfo, SupervisorInfo._Fields>, java.io.Serializable, Cloneable, Comparable<SupervisorInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SupervisorInfo");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
index 7e36d0f..022ecb4 100644
--- a/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/SupervisorSummary.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class SupervisorSummary implements org.apache.thrift.TBase<SupervisorSummary, SupervisorSummary._Fields>, java.io.Serializable, Cloneable, Comparable<SupervisorSummary> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("SupervisorSummary");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java b/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
index 4b2bc63..e233458 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ThriftSerializedObject.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class ThriftSerializedObject implements org.apache.thrift.TBase<ThriftSerializedObject, ThriftSerializedObject._Fields>, java.io.Serializable, Cloneable, Comparable<ThriftSerializedObject> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ThriftSerializedObject");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
index 4f78417..c81eac5 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologyInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class TopologyInfo implements org.apache.thrift.TBase<TopologyInfo, TopologyInfo._Fields>, java.io.Serializable, Cloneable, Comparable<TopologyInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologyInfo");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java b/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
index 180b608..ad29c7d 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologyPageInfo.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class TopologyPageInfo implements org.apache.thrift.TBase<TopologyPageInfo, TopologyPageInfo._Fields>, java.io.Serializable, Cloneable, Comparable<TopologyPageInfo> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologyPageInfo");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java b/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
index 0ff01de..ded0010 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologyStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class TopologyStats implements org.apache.thrift.TBase<TopologyStats, TopologyStats._Fields>, java.io.Serializable, Cloneable, Comparable<TopologyStats> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologyStats");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java b/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
index 055a01a..cfa5e24 100644
--- a/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/TopologySummary.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class TopologySummary implements org.apache.thrift.TBase<TopologySummary, TopologySummary._Fields>, java.io.Serializable, Cloneable, Comparable<TopologySummary> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("TopologySummary");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java b/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
index 7cfadd7..2ab462f 100644
--- a/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
+++ b/storm-core/src/jvm/backtype/storm/generated/WorkerResources.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-9")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-30")
public class WorkerResources implements org.apache.thrift.TBase<WorkerResources, WorkerResources._Fields>, java.io.Serializable, Cloneable, Comparable<WorkerResources> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("WorkerResources");
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 2149c0d..7ecd770 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -17,6 +17,17 @@
*/
package backtype.storm.messaging.netty;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.lang.InterruptedException;
+
import backtype.storm.Config;
import backtype.storm.messaging.ConnectionWithStatus;
import backtype.storm.messaging.TaskMessage;
@@ -34,17 +45,11 @@ import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
+
import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkState;
@@ -60,7 +65,7 @@ import static com.google.common.base.Preconditions.checkState;
* - Note: The current implementation drops any messages that are being enqueued for sending if the connection to
* the remote destination is currently unavailable.
*/
-public class Client extends ConnectionWithStatus implements IStatefulObject {
+public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
@@ -73,7 +78,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
private final ClientBootstrap bootstrap;
private final InetSocketAddress dstAddress;
protected final String dstAddressPrefixedName;
-
+
/**
* The channel used for all write operations from this client to the remote destination.
*/
@@ -104,6 +109,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
*/
private final AtomicLong pendingMessages = new AtomicLong(0);
+ /**
+ * Whether the SASL channel is ready.
+ */
+ private final AtomicBoolean saslChannelReady = new AtomicBoolean(false);
/**
* This flag is set to true if and only if a client instance is being closed.
@@ -125,6 +134,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
this.scheduler = scheduler;
this.context = context;
int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
+ saslChannelReady.set(!Utils.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
int messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
@@ -134,19 +145,19 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
// Initiate connection to remote destination
- bootstrap = createClientBootstrap(factory, bufferSize);
+ bootstrap = createClientBootstrap(factory, bufferSize, stormConf);
dstAddress = new InetSocketAddress(host, port);
dstAddressPrefixedName = prefixedName(dstAddress);
scheduleConnect(NO_DELAY_MS);
batcher = new MessageBuffer(messageBatchSize);
}
- private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) {
+ private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize, Map stormConf) {
ClientBootstrap bootstrap = new ClientBootstrap(factory);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("sendBufferSize", bufferSize);
bootstrap.setOption("keepAlive", true);
- bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+ bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf));
return bootstrap;
}
@@ -158,7 +169,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
}
/**
- * We will retry connection with exponential back-off policy
+ * Enqueue a task message to be sent to server
*/
private void scheduleConnect(long delayMs) {
scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
@@ -190,7 +201,11 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
} else if (!connectionEstablished(channelRef.get())) {
return Status.Connecting;
} else {
- return Status.Ready;
+ if (saslChannelReady.get()) {
+ return Status.Ready;
+ } else {
+ return Status.Connecting; // need to wait until sasl channel is also ready
+ }
}
}
@@ -202,7 +217,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
@Override
public Iterator<TaskMessage> recv(int flags, int clientId) {
throw new UnsupportedOperationException("Client connection should not receive any messages");
- }
+ }
@Override
public void send(int taskId, byte[] payload) {
@@ -225,7 +240,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
}
if (!hasMessages(msgs)) {
- return;
+ return;
}
Channel channel = getConnectedChannel();
@@ -266,7 +281,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
// We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
// because we know `Channel.isWritable` was false after the messages were already in the buffer.
}
- }
+ }
private Channel getConnectedChannel() {
Channel channel = channelRef.get();
@@ -281,6 +296,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
}
return null;
}
+ }
+
+ public InetSocketAddress getDstAddress() {
+ return dstAddress;
}
private boolean hasMessages(Iterator<TaskMessage> msgs) {
@@ -292,7 +311,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
// We consume the iterator by traversing and thus "emptying" it.
int msgCount = iteratorSize(msgs);
messagesLost.getAndAdd(msgCount);
- }
+ }
private int iteratorSize(Iterator<TaskMessage> msgs) {
int size = 0;
@@ -300,8 +319,8 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
while (msgs.hasNext()) {
size++;
msgs.next();
+ }
}
- }
return size;
}
@@ -335,7 +354,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
}
});
- }
+ }
/**
* Schedule a reconnect if we closed a non-null channel, and acquired the right to
@@ -375,7 +394,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
long totalPendingMsgs = pendingMessages.get();
long startMs = System.currentTimeMillis();
while (pendingMessages.get() != 0) {
- try {
+ try {
long deltaMs = System.currentTimeMillis() - startMs;
if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
@@ -386,11 +405,10 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
}
catch (InterruptedException e) {
break;
- }
}
-
}
+ }
private void closeChannel() {
Channel channel = channelRef.get();
@@ -402,7 +420,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
@Override
public Object getState() {
- LOG.info("Getting metrics for client connection to {}", dstAddressPrefixedName);
+ LOG.debug("Getting metrics for client connection to {}", dstAddressPrefixedName);
HashMap<String, Object> ret = new HashMap<String, Object>();
ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
ret.put("sent", messagesSent.getAndSet(0));
@@ -416,10 +434,28 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
return ret;
}
- public Map getStormConf() {
+ public Map getConfig() {
return stormConf;
}
+ /** ISaslClient interface **/
+ public void channelConnected(Channel channel) {
+// setChannel(channel);
+ }
+
+ public void channelReady() {
+ saslChannelReady.set(true);
+ }
+
+ public String name() {
+ return (String)stormConf.get(Config.TOPOLOGY_NAME);
+ }
+
+ public String secretKey() {
+ return SaslUtils.getSecretKey(stormConf);
+ }
+ /** end **/
+
private String srcAddressName() {
String name = null;
Channel channel = channelRef.get();
@@ -495,7 +531,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
connectionAttempt);
if (messagesLost.get() > 0) {
LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
- }
+ }
} else {
Throwable cause = future.getCause();
reschedule(cause);
@@ -510,8 +546,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject {
throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
- }
+ }
}
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 5d27a16..10c5059 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -21,10 +21,9 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
import java.util.HashMap;
import java.util.Map;
@@ -55,6 +54,7 @@ public class Context implements IContext {
int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
+ // TODO investigate impact of having one worker
if (maxWorkers > 0) {
clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
Executors.newCachedThreadPool(workerFactory), maxWorkers);
@@ -103,12 +103,10 @@ public class Context implements IContext {
for (IConnection conn : connections.values()) {
conn.close();
}
-
connections = null;
//we need to release resources associated with client channel factory
clientChannelFactory.releaseExternalResources();
-
}
private String key(String host, int port) {
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index fb3efe6..bffd953 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -23,7 +23,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
-enum ControlMessage {
+public enum ControlMessage implements INettySerializable {
CLOSE_MESSAGE((short)-100),
EOB_MESSAGE((short)-201),
OK_RESPONSE((short)-200),
@@ -43,14 +43,14 @@ enum ControlMessage {
* @param encoded
* @return
*/
- static ControlMessage mkMessage(short encoded) {
+ public static ControlMessage mkMessage(short encoded) {
for(ControlMessage cm: ControlMessage.values()) {
if(encoded == cm.code) return cm;
}
return null;
}
- int encodeLength() {
+ public int encodeLength() {
return 2; //short
}
@@ -58,14 +58,19 @@ enum ControlMessage {
* encode the current Control Message into a channel buffer
* @throws Exception
*/
- ChannelBuffer buffer() throws IOException {
+ public ChannelBuffer buffer() throws IOException {
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
write(bout);
bout.close();
return bout.buffer();
}
- void write(ChannelBufferOutputStream bout) throws IOException {
+ public static ControlMessage read(byte[] serial) {
+ ChannelBuffer cm_buffer = ChannelBuffers.copiedBuffer(serial);
+ return mkMessage(cm_buffer.getShort(0));
+ }
+
+ public void write(ChannelBufferOutputStream bout) throws IOException {
bout.writeShort(code);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java b/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java
new file mode 100644
index 0000000..945e6e9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/INettySerializable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+public interface INettySerializable {
+ public ChannelBuffer buffer() throws IOException;
+ public int encodeLength();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java
new file mode 100644
index 0000000..57dcfe8
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslClient.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import backtype.storm.Config;
+
+public interface ISaslClient {
+ void channelConnected(Channel channel);
+ void channelReady();
+ String name();
+ String secretKey();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java
new file mode 100644
index 0000000..4203dcc
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ISaslServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface ISaslServer extends IServer {
+ String name();
+ String secretKey();
+ void authenticated(Channel c);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java
new file mode 100644
index 0000000..d046492
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/IServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface IServer {
+ void channelConnected(Channel c);
+ void received(Object message, String remote, Channel channel) throws InterruptedException;
+ void closeChannel(Channel c);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
new file mode 100644
index 0000000..9ae34fe
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslClientHandler.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslClientHandler extends SimpleChannelUpstreamHandler {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslClientHandler.class);
+ private ISaslClient client;
+ long start_time;
+ /** Used for client or server's token to send or receive from each other. */
+ private Map storm_conf;
+ private String jaas_section;
+
+ public KerberosSaslClientHandler(ISaslClient client, Map storm_conf, String jaas_section) throws IOException {
+ this.client = client;
+ this.storm_conf = storm_conf;
+ this.jaas_section = jaas_section;
+ start_time = System.currentTimeMillis();
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx,
+ ChannelStateEvent event) {
+ // register the newly established channel
+ Channel channel = ctx.getChannel();
+ client.channelConnected(channel);
+
+ LOG.info("Connection established from {} to {}",
+ channel.getLocalAddress(), channel.getRemoteAddress());
+
+ try {
+ KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+ .get(channel);
+
+ if (saslNettyClient == null) {
+ LOG.debug("Creating saslNettyClient now for channel: {}",
+ channel);
+ saslNettyClient = new KerberosSaslNettyClient(storm_conf, jaas_section);
+ KerberosSaslNettyClientState.getKerberosSaslNettyClient.set(channel,
+ saslNettyClient);
+ }
+ LOG.debug("Going to initiate Kerberos negotiations.");
+ byte[] initialChallenge = saslNettyClient.saslResponse(new SaslMessageToken(new byte[0]));
+ LOG.debug("Sending initial challenge: {}", initialChallenge);
+ channel.write(new SaslMessageToken(initialChallenge));
+ } catch (Exception e) {
+ LOG.error("Failed to authenticate with server due to error: ",
+ e);
+ }
+ return;
+
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
+ throws Exception {
+ LOG.debug("send/recv time (ms): {}",
+ (System.currentTimeMillis() - start_time));
+
+ Channel channel = ctx.getChannel();
+
+ // Generate SASL response to server using Channel-local SASL client.
+ KerberosSaslNettyClient saslNettyClient = KerberosSaslNettyClientState.getKerberosSaslNettyClient
+ .get(channel);
+ if (saslNettyClient == null) {
+ throw new Exception("saslNettyClient was unexpectedly null for channel:" + channel);
+ }
+
+ // examine the response message from server
+ if (event.getMessage() instanceof ControlMessage) {
+ ControlMessage msg = (ControlMessage) event.getMessage();
+ if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
+ LOG.debug("Server has sent us the SaslComplete message. Allowing normal work to proceed.");
+
+ if (!saslNettyClient.isComplete()) {
+ String message = "Server returned a Sasl-complete message, but as far as we can tell, we are not authenticated yet.";
+ LOG.error(message);
+ throw new Exception(message);
+ }
+ ctx.getPipeline().remove(this);
+ this.client.channelReady();
+
+ // We call fireMessageReceived since the client is allowed to
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
+ Channels.fireMessageReceived(ctx, msg);
+ }
+ else {
+ LOG.warn("Unexpected control message: {}", msg);
+ }
+ return;
+ }
+ else if (event.getMessage() instanceof SaslMessageToken) {
+ SaslMessageToken saslTokenMessage = (SaslMessageToken) event
+ .getMessage();
+ LOG.debug("Responding to server's token of length: {}",
+ saslTokenMessage.getSaslToken().length);
+
+ // Generate SASL response (but we only actually send the response if
+ // it's non-null.
+ byte[] responseToServer = saslNettyClient
+ .saslResponse(saslTokenMessage);
+ if (responseToServer == null) {
+ // If we generate a null response, then authentication has completed
+ // (if not, warn), and return without sending a response back to the
+ // server.
+ LOG.debug("Response to server is null: authentication should now be complete.");
+ if (!saslNettyClient.isComplete()) {
+ LOG.warn("Generated a null response, but authentication is not complete.");
+ throw new Exception("Our reponse to the server is null, but as far as we can tell, we are not authenticated yet.");
+ }
+ this.client.channelReady();
+ return;
+ } else {
+ LOG.debug("Response to server token has length: {}",
+ responseToServer.length);
+ }
+ // Construct a message containing the SASL response and send it to the
+ // server.
+ SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
+ channel.write(saslResponse);
+ }
+ else {
+ LOG.error("Unexpected message from server: {}", event.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
new file mode 100644
index 0000000..32afab0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClient.java
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.security.auth.AuthUtils;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements SASL logic for storm worker client processes.
+ */
+public class KerberosSaslNettyClient {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslNettyClient.class);
+
+ /**
+ * Used to respond to server's counterpart, SaslServer with SASL tokens
+ * represented as byte arrays.
+ */
+ private SaslClient saslClient;
+ private Subject subject;
+ private String jaas_section;
+
+ /**
+ * Create a KerberosSaslNettyClient for authentication with servers.
+ */
+ public KerberosSaslNettyClient(Map storm_conf, String jaas_section) {
+ LOG.debug("KerberosSaslNettyClient: Creating SASL {} client to authenticate to server ",
+ SaslUtils.KERBEROS);
+
+ LOG.info("Creating Kerberos Client.");
+
+ Configuration login_conf;
+ try {
+ login_conf = AuthUtils.GetConfiguration(storm_conf);
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to get login_conf: ", t);
+ throw t;
+ }
+ LOG.debug("KerberosSaslNettyClient: authmethod {}", SaslUtils.KERBEROS);
+
+ SaslClientCallbackHandler ch = new SaslClientCallbackHandler();
+
+ subject = null;
+ try {
+ LOG.debug("Setting Configuration to login_config: {}", login_conf);
+ //specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ //now login
+ LOG.debug("Trying to login.");
+ Login login = new Login(jaas_section, ch);
+ subject = login.getSubject();
+ LOG.debug("Got Subject: {}", subject.toString());
+ } catch (LoginException ex) {
+ LOG.error("Client failed to login in principal:" + ex, ex);
+ throw new RuntimeException(ex);
+ }
+
+ //check the credential of our principal
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ LOG.error("Failed to verify user principal.");
+ throw new RuntimeException("Fail to verify user principal with section \"" +
+ jaas_section +
+ "\" in login configuration file " +
+ login_conf);
+ }
+
+ String serviceName = null;
+ try {
+ serviceName = AuthUtils.get(login_conf, jaas_section, "serviceName");
+ }
+ catch (IOException e) {
+ LOG.error("Failed to get service name.", e);
+ throw new RuntimeException(e);
+ }
+
+ try {
+ Principal principal = (Principal)subject.getPrincipals().toArray()[0];
+ final String fPrincipalName = principal.getName();
+ KerberosName kerbName = new KerberosName(principal.getName());
+ final String fHost = (String)storm_conf.get(Config.PACEMAKER_HOST);
+ final String fServiceName = serviceName;
+ final CallbackHandler fch = ch;
+ LOG.debug("Kerberos Client with principal: {}, host: {}", fPrincipalName, fHost);
+ saslClient = Subject.doAs(subject, new PrivilegedExceptionAction<SaslClient>() {
+ public SaslClient run() {
+ try {
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ return Sasl.createSaslClient(
+ new String[] { SaslUtils.KERBEROS },
+ fPrincipalName,
+ fServiceName,
+ fHost,
+ props, fch);
+ }
+ catch (Exception e) {
+ LOG.error("Subject failed to create sasl client.", e);
+ return null;
+ }
+ }
+ });
+ LOG.info("Got Client: {}", saslClient);
+
+ } catch (PrivilegedActionException e) {
+ LOG.error("KerberosSaslNettyClient: Could not create Sasl Netty Client.");
+ throw new RuntimeException(e);
+ }
+}
+
+ public boolean isComplete() {
+ return saslClient.isComplete();
+ }
+
+ /**
+ * Respond to server's SASL token.
+ *
+ * @param saslTokenMessage
+ * contains server's SASL token
+ * @return client's response SASL token
+ */
+ public byte[] saslResponse(SaslMessageToken saslTokenMessage) {
+ try {
+ final SaslMessageToken fSaslTokenMessage = saslTokenMessage;
+ byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+ public byte[] run() {
+ try {
+ byte[] retval = saslClient.evaluateChallenge(fSaslTokenMessage
+ .getSaslToken());
+ return retval;
+ } catch (SaslException e) {
+ LOG.error("saslResponse: Failed to respond to SASL server's token:",
+ e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ return retval;
+ }
+ catch (PrivilegedActionException e) {
+ LOG.error("Failed to generate response for token: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Implementation of javax.security.auth.callback.CallbackHandler that works
+ * with Storm topology tokens.
+ */
+ private static class SaslClientCallbackHandler implements CallbackHandler {
+
+ /**
+ * Set private members using topology token.
+ *
+ * @param topologyToken
+ */
+ public SaslClientCallbackHandler() {
+ }
+
+ /**
+ * Implementation used to respond to SASL tokens from server.
+ *
+ * @param callbacks
+ * objects that indicate what credential information the
+ * server's SaslServer requires from the client.
+ * @throws UnsupportedCallbackException
+ */
+ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ LOG.info("Kerberos Client Callback Handler got callback: {}", callback.getClass());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
new file mode 100644
index 0000000..1283d9b
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyClientState.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyClientState {
+
+ public static final ChannelLocal<KerberosSaslNettyClient> getKerberosSaslNettyClient = new ChannelLocal<KerberosSaslNettyClient>() {
+ protected KerberosSaslNettyClient initialValue(Channel channel) {
+ return null;
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
new file mode 100644
index 0000000..a0003c6
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServer.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.security.auth.AuthUtils;
+import backtype.storm.security.auth.KerberosPrincipalToLocal;
+import java.io.IOException;
+import java.security.Principal;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import javax.security.auth.Subject;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
+import org.apache.zookeeper.Login;
+import org.apache.zookeeper.server.auth.KerberosName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+class KerberosSaslNettyServer {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslNettyServer.class);
+
+ private SaslServer saslServer;
+ private Subject subject;
+ private String jaas_section;
+ private List<String> authorizedUsers;
+
+ KerberosSaslNettyServer(Map storm_conf, String jaas_section, List<String> authorizedUsers) {
+ this.authorizedUsers = authorizedUsers;
+ LOG.debug("Getting Configuration.");
+ Configuration login_conf;
+ try {
+ login_conf = AuthUtils.GetConfiguration(storm_conf);
+ }
+ catch (Throwable t) {
+ LOG.error("Failed to get login_conf: ", t);
+ throw t;
+ }
+
+ LOG.debug("KerberosSaslNettyServer: authmethod {}", SaslUtils.KERBEROS);
+
+ KerberosSaslCallbackHandler ch = new KerberosSaslNettyServer.KerberosSaslCallbackHandler(storm_conf, authorizedUsers);
+
+ //login our principal
+ subject = null;
+ try {
+ LOG.debug("Setting Configuration to login_config: {}", login_conf);
+ //specify a configuration object to be used
+ Configuration.setConfiguration(login_conf);
+ //now login
+ LOG.debug("Trying to login.");
+ Login login = new Login(jaas_section, ch);
+ subject = login.getSubject();
+ LOG.debug("Got Subject: {}", subject.toString());
+ } catch (LoginException ex) {
+ LOG.error("Server failed to login in principal:", ex);
+ throw new RuntimeException(ex);
+ }
+
+ //check the credential of our principal
+ if (subject.getPrivateCredentials(KerberosTicket.class).isEmpty()) {
+ LOG.error("Failed to verifyuser principal.");
+ throw new RuntimeException("Fail to verify user principal with section \""
+ + jaas_section
+ + "\" in login configuration file "
+ + login_conf);
+ }
+
+ try {
+ LOG.info("Creating Kerberos Server.");
+ final CallbackHandler fch = ch;
+ Principal p = (Principal)subject.getPrincipals().toArray()[0];
+ KerberosName kName = new KerberosName(p.getName());
+ final String fHost = kName.getHostName();
+ final String fServiceName = kName.getServiceName();
+ LOG.debug("Server with host: {}", fHost);
+ saslServer =
+ Subject.doAs(subject, new PrivilegedExceptionAction<SaslServer>() {
+ public SaslServer run() {
+ try {
+ Map<String, String> props = new TreeMap<String,String>();
+ props.put(Sasl.QOP, "auth");
+ props.put(Sasl.SERVER_AUTH, "false");
+ return Sasl.createSaslServer(SaslUtils.KERBEROS,
+ fServiceName,
+ fHost, props, fch);
+ }
+ catch (Exception e) {
+ LOG.error("Subject failed to create sasl server.", e);
+ return null;
+ }
+ }
+ });
+ LOG.info("Got Server: {}", saslServer);
+
+ } catch (PrivilegedActionException e) {
+ LOG.error("KerberosSaslNettyServer: Could not create SaslServer: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public boolean isComplete() {
+ return saslServer.isComplete();
+ }
+
+ public String getUserName() {
+ return saslServer.getAuthorizationID();
+ }
+
+ private String getPrincipal(Subject subject) {
+ Set<Principal> principals = (Set<Principal>)subject.getPrincipals();
+ if (principals==null || principals.size()<1) {
+ LOG.info("No principal found in login subject");
+ return null;
+ }
+ return ((Principal)(principals.toArray()[0])).getName();
+ }
+
+ /** CallbackHandler for SASL DIGEST-MD5 mechanism */
+ public static class KerberosSaslCallbackHandler implements CallbackHandler {
+
+ /** Used to authenticate the clients */
+ private Map config;
+ private List<String> authorizedUsers;
+
+ public KerberosSaslCallbackHandler(Map config, List<String> authorizedUsers) {
+ LOG.debug("KerberosSaslCallback: Creating KerberosSaslCallback handler.");
+ this.config = config;
+ this.authorizedUsers = authorizedUsers;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (Callback callback : callbacks) {
+ LOG.info("Kerberos Callback Handler got callback: {}", callback.getClass());
+ if(callback instanceof AuthorizeCallback) {
+ AuthorizeCallback ac = (AuthorizeCallback)callback;
+ if(!ac.getAuthenticationID().equals(ac.getAuthorizationID())) {
+ LOG.debug("{} != {}", ac.getAuthenticationID(), ac.getAuthorizationID());
+ continue;
+ }
+
+ LOG.debug("Authorized Users: {}", authorizedUsers);
+ LOG.debug("Checking authorization for: {}", ac.getAuthorizationID());
+ for(String user : authorizedUsers) {
+ String requester = ac.getAuthorizationID();
+
+ KerberosPrincipal principal = new KerberosPrincipal(requester);
+ requester = new KerberosPrincipalToLocal().toLocal(principal);
+
+ if(requester.equals(user) ) {
+ ac.setAuthorized(true);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Used by SaslTokenMessage::processToken() to respond to server SASL
+ * tokens.
+ *
+ * @param token
+ * Server's SASL token
+ * @return token to send back to the server.
+ */
+ public byte[] response(final byte[] token) {
+ try {
+ byte [] retval = Subject.doAs(subject, new PrivilegedExceptionAction<byte[]>() {
+ public byte[] run(){
+ try {
+ LOG.debug("response: Responding to input token of length: {}",
+ token.length);
+ byte[] retval = saslServer.evaluateResponse(token);
+ return retval;
+ } catch (SaslException e) {
+ LOG.error("response: Failed to evaluate client token of length: {} : {}",
+ token.length, e);
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ return retval;
+ }
+ catch (PrivilegedActionException e) {
+ LOG.error("Failed to generate response for token: ", e);
+ throw new RuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
new file mode 100644
index 0000000..064dc91
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslNettyServerState.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelLocal;
+
+final class KerberosSaslNettyServerState {
+
+ public static final ChannelLocal<KerberosSaslNettyServer> getKerberosSaslNettyServer = new ChannelLocal<KerberosSaslNettyServer>() {
+ protected KerberosSaslNettyServer initialValue(Channel channel) {
+ return null;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
new file mode 100644
index 0000000..3ed3fd7
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/KerberosSaslServerHandler.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KerberosSaslServerHandler extends SimpleChannelUpstreamHandler {
+
+ ISaslServer server;
+ /** Used for client or server's token to send or receive from each other. */
+ private Map storm_conf;
+ private String jaas_section;
+ private List<String> authorizedUsers;
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(KerberosSaslServerHandler.class);
+
+ public KerberosSaslServerHandler(ISaslServer server, Map storm_conf, String jaas_section, List<String> authorizedUsers) throws IOException {
+ this.server = server;
+ this.storm_conf = storm_conf;
+ this.jaas_section = jaas_section;
+ this.authorizedUsers = authorizedUsers;
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
+ throws Exception {
+ Object msg = e.getMessage();
+ if (msg == null)
+ return;
+
+ Channel channel = ctx.getChannel();
+
+
+ if (msg instanceof SaslMessageToken) {
+ // initialize server-side SASL functionality, if we haven't yet
+ // (in which case we are looking at the first SASL message from the
+ // client).
+ try {
+ LOG.debug("Got SaslMessageToken!");
+
+ KerberosSaslNettyServer saslNettyServer = KerberosSaslNettyServerState.getKerberosSaslNettyServer
+ .get(channel);
+ if (saslNettyServer == null) {
+ LOG.debug("No saslNettyServer for {} yet; creating now, with topology token: ", channel);
+ try {
+ saslNettyServer = new KerberosSaslNettyServer(storm_conf, jaas_section, authorizedUsers);
+ } catch (RuntimeException ioe) {
+ LOG.error("Error occurred while creating saslNettyServer on server {} for client {}",
+ channel.getLocalAddress(), channel.getRemoteAddress());
+ saslNettyServer = null;
+ }
+
+ KerberosSaslNettyServerState.getKerberosSaslNettyServer.set(channel,
+ saslNettyServer);
+ } else {
+ LOG.debug("Found existing saslNettyServer on server: {} for client {}",
+ channel.getLocalAddress(), channel.getRemoteAddress());
+ }
+
+ byte[] responseBytes = saslNettyServer.response(((SaslMessageToken) msg)
+ .getSaslToken());
+
+ SaslMessageToken saslTokenMessageRequest = new SaslMessageToken(responseBytes);
+
+ if(saslTokenMessageRequest.getSaslToken() == null) {
+ channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+ }
+ else {
+ // Send response to client.
+ channel.write(saslTokenMessageRequest);
+ }
+
+ if (saslNettyServer.isComplete()) {
+ // If authentication of client is complete, we will also send a
+ // SASL-Complete message to the client.
+ LOG.info("SASL authentication is complete for client with username: {}",
+ saslNettyServer.getUserName());
+ channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+ LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
+ ctx.getPipeline().remove(this);
+ server.authenticated(channel);
+ }
+ return;
+ }
+ catch (Exception ex) {
+ LOG.error("Failed to handle SaslMessageToken: ", ex);
+ throw ex;
+ }
+ } else {
+ // Client should not be sending other-than-SASL messages before
+ // SaslServerHandler has removed itself from the pipeline. Such
+ // non-SASL requests will be denied by the Authorize channel handler
+ // (the next handler upstream in the server pipeline) if SASL
+ // authentication has not completed.
+ LOG.warn("Sending upstream an unexpected non-SASL message : {}",
+ msg);
+ Channels.fireMessageReceived(ctx, msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ if(server != null) server.closeChannel(e.getChannel());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 7d8bf54..8c99e78 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -70,7 +70,7 @@ public class MessageDecoder extends FrameDecoder {
}
//case 2: SaslTokenMessageRequest
- if(code==-500) {
+ if(code == SaslMessageToken.IDENTIFIER) {
// Make sure that we have received at least an integer (length)
if (buf.readableBytes() < 4) {
//need more data
@@ -142,4 +142,4 @@ public class MessageDecoder extends FrameDecoder {
return ret;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
index 3a91a58..2a1cdea 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -33,8 +33,9 @@ public class NettyRenameThreadFactory implements ThreadFactory {
final ThreadGroup group;
final AtomicInteger index = new AtomicInteger(1);
final String name;
+ static final NettyUncaughtExceptionHandler uncaughtExceptionHandler = new NettyUncaughtExceptionHandler();
- NettyRenameThreadFactory(String name) {
+ public NettyRenameThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
@@ -43,10 +44,13 @@ public class NettyRenameThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
- if (t.isDaemon())
+ if (t.isDaemon()) {
t.setDaemon(false);
- if (t.getPriority() != Thread.NORM_PRIORITY)
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
+ }
+ t.setUncaughtExceptionHandler(uncaughtExceptionHandler);
return t;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
new file mode 100644
index 0000000..3d31544
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyUncaughtExceptionHandler.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import backtype.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NettyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyUncaughtExceptionHandler.class);
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ try {
+ Utils.handleUncaughtException(e);
+ } catch (Error error) {
+ LOG.info("Received error in netty thread.. terminating server...");
+ Runtime.getRuntime().exit(1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/444ec05e/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
index d0d3ca1..2fe5c2d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/SaslMessageToken.java
@@ -17,6 +17,7 @@
*/
package backtype.storm.messaging.netty;
+import java.io.IOException;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferOutputStream;
import org.jboss.netty.buffer.ChannelBuffers;
@@ -26,7 +27,10 @@ import org.slf4j.LoggerFactory;
/**
* Send and receive SASL tokens.
*/
-public class SaslMessageToken {
+
+public class SaslMessageToken implements INettySerializable {
+ public static final short IDENTIFIER = -500;
+
/** Class logger */
private static final Logger LOG = LoggerFactory
.getLogger(SaslMessageToken.class);
@@ -69,7 +73,8 @@ public class SaslMessageToken {
this.token = token;
}
- int encodeLength() {
+
+ public int encodeLength() {
return 2 + 4 + token.length;
}
@@ -80,15 +85,15 @@ public class SaslMessageToken {
*
* @throws Exception
*/
- ChannelBuffer buffer() throws Exception {
+ public ChannelBuffer buffer() throws IOException {
ChannelBufferOutputStream bout = new ChannelBufferOutputStream(
ChannelBuffers.directBuffer(encodeLength()));
- short identifier = -500;
int payload_len = 0;
if (token != null)
payload_len = token.length;
- bout.writeShort((short) identifier);
+
+ bout.writeShort(IDENTIFIER);
bout.writeInt((int) payload_len);
if (payload_len > 0) {
bout.write(token);
@@ -96,4 +101,16 @@ public class SaslMessageToken {
bout.close();
return bout.buffer();
}
+
+ public static SaslMessageToken read(byte[] serial) {
+ ChannelBuffer sm_buffer = ChannelBuffers.copiedBuffer(serial);
+ short identifier = sm_buffer.readShort();
+ int payload_len = sm_buffer.readInt();
+ if(identifier != -500) {
+ return null;
+ }
+ byte token[] = new byte[payload_len];
+ sm_buffer.readBytes(token, 0, payload_len);
+ return new SaslMessageToken(token);
+ }
}