You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/04/06 17:32:52 UTC
[03/52] [partial] storm git commit: STORM-2441 Break down
'storm-core' to extract client (worker) artifacts
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
new file mode 100644
index 0000000..ad8014c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/grouping/ShuffleGrouping.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.grouping;
+
+import org.apache.storm.generated.GlobalStreamId;
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ShuffleGrouping implements CustomStreamGrouping, Serializable {
+ private Random random;
+ private ArrayList<List<Integer>> choices;
+ private AtomicInteger current;
+
+ @Override
+ public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks) {
+ random = new Random();
+ choices = new ArrayList<List<Integer>>(targetTasks.size());
+ for (Integer i: targetTasks) {
+ choices.add(Arrays.asList(i));
+ }
+ Collections.shuffle(choices, random);
+ current = new AtomicInteger(0);
+ }
+
+ @Override
+ public List<Integer> chooseTasks(int taskId, List<Object> values) {
+ int rightNow;
+ int size = choices.size();
+ while (true) {
+ rightNow = current.incrementAndGet();
+ if (rightNow < size) {
+ return choices.get(rightNow);
+ } else if (rightNow == size) {
+ current.set(0);
+ return choices.get(0);
+ }
+ //race condition with another thread, and we lost
+ // try again
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java b/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java
new file mode 100644
index 0000000..3eee8d8
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/BaseTaskHook.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.task.TopologyContext;
+import java.util.Map;
+
+public class BaseTaskHook implements ITaskHook {
+ @Override
+ public void prepare(Map conf, TopologyContext context) {
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void emit(EmitInfo info) {
+ }
+
+ @Override
+ public void spoutAck(SpoutAckInfo info) {
+ }
+
+ @Override
+ public void spoutFail(SpoutFailInfo info) {
+ }
+
+ @Override
+ public void boltAck(BoltAckInfo info) {
+ }
+
+ @Override
+ public void boltFail(BoltFailInfo info) {
+ }
+
+ @Override
+ public void boltExecute(BoltExecuteInfo info) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java b/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
new file mode 100644
index 0000000..e178280
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/BaseWorkerHook.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * A BaseWorkerHook is a noop implementation of IWorkerHook. You
+ * may extends this class and implement any and/or all methods you
+ * need for your workers.
+ */
+public class BaseWorkerHook implements IWorkerHook, Serializable {
+ private static final long serialVersionUID = 2589466485198339529L;
+
+ /**
+ * This method is called when a worker is started
+ *
+ * @param stormConf The Storm configuration for this worker
+ * @param context This object can be used to get information about this worker's place within the topology
+ */
+ @Override
+ public void start(Map stormConf, WorkerTopologyContext context) {
+ // NOOP
+ }
+
+ /**
+ * This method is called right before a worker shuts down
+ */
+ @Override
+ public void shutdown() {
+ // NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java b/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java
new file mode 100644
index 0000000..519af0c
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/ITaskHook.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltExecuteInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.task.TopologyContext;
+import java.util.Map;
+
+public interface ITaskHook {
+ void prepare(Map conf, TopologyContext context);
+ void cleanup();
+ void emit(EmitInfo info);
+ void spoutAck(SpoutAckInfo info);
+ void spoutFail(SpoutFailInfo info);
+ void boltExecute(BoltExecuteInfo info);
+ void boltAck(BoltAckInfo info);
+ void boltFail(BoltFailInfo info);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java b/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java
new file mode 100644
index 0000000..5f49fe3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks;
+
+import org.apache.storm.task.WorkerTopologyContext;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * An IWorkerHook represents a topology component that can be executed
+ * when a worker starts, and when a worker shuts down. It can be useful
+ * when you want to execute operations before topology processing starts,
+ * or cleanup operations before your workers shut down.
+ */
+public interface IWorkerHook extends Serializable {
+ /**
+ * This method is called when a worker is started
+ *
+ * @param stormConf The Storm configuration for this worker
+ * @param context This object can be used to get information about this worker's place within the topology
+ */
+ void start(Map stormConf, WorkerTopologyContext context);
+
+ /**
+ * This method is called right before a worker shuts down
+ */
+ void shutdown();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java b/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java
new file mode 100644
index 0000000..b7679a7
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/SubmitterHookException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.storm.hooks;
+
+/**
+ * This Exception is thrown when registered {@link org.apache.storm.ISubmitterHook} could not be initialized or invoked.
+ */
+public class SubmitterHookException extends RuntimeException {
+
+ public SubmitterHookException() {
+ }
+
+ public SubmitterHookException(String message) {
+ super(message);
+ }
+
+ public SubmitterHookException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SubmitterHookException(Throwable cause) {
+ super(cause);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
new file mode 100644
index 0000000..e6f4b11
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltAckInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+public class BoltAckInfo {
+ public Tuple tuple;
+ public int ackingTaskId;
+ public Long processLatencyMs; // null if it wasn't sampled
+
+ public BoltAckInfo(Tuple tuple, int ackingTaskId, Long processLatencyMs) {
+ this.tuple = tuple;
+ this.ackingTaskId = ackingTaskId;
+ this.processLatencyMs = processLatencyMs;
+ }
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.boltAck(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
new file mode 100644
index 0000000..73a7f33
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltExecuteInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+public class BoltExecuteInfo {
+ public Tuple tuple;
+ public int executingTaskId;
+ public Long executeLatencyMs; // null if it wasn't sampled
+
+ public BoltExecuteInfo(Tuple tuple, int executingTaskId, Long executeLatencyMs) {
+ this.tuple = tuple;
+ this.executingTaskId = executingTaskId;
+ this.executeLatencyMs = executeLatencyMs;
+ }
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.boltExecute(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
new file mode 100644
index 0000000..4e1e32d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/BoltFailInfo.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+
+public class BoltFailInfo {
+ public Tuple tuple;
+ public int failingTaskId;
+ public Long failLatencyMs; // null if it wasn't sampled
+
+ public BoltFailInfo(Tuple tuple, int failingTaskId, Long failLatencyMs) {
+ this.tuple = tuple;
+ this.failingTaskId = failingTaskId;
+ this.failLatencyMs = failLatencyMs;
+ }
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.boltFail(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
new file mode 100644
index 0000000..52965a1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/EmitInfo.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.Collection;
+import java.util.List;
+
+public class EmitInfo {
+ public List<Object> values;
+ public String stream;
+ public int taskId;
+ public Collection<Integer> outTasks;
+
+ public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) {
+ this.values = values;
+ this.stream = stream;
+ this.taskId = taskId;
+ this.outTasks = outTasks;
+ }
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.emit(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
new file mode 100644
index 0000000..4949f0f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutAckInfo.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutAckInfo {
+ public Object messageId;
+ public int spoutTaskId;
+ public Long completeLatencyMs; // null if it wasn't sampled
+
+ public SpoutAckInfo(Object messageId, int spoutTaskId, Long completeLatencyMs) {
+ this.messageId = messageId;
+ this.spoutTaskId = spoutTaskId;
+ this.completeLatencyMs = completeLatencyMs;
+ }
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.spoutAck(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
new file mode 100644
index 0000000..5b40005
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/hooks/info/SpoutFailInfo.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hooks.info;
+
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutFailInfo {
+ public Object messageId;
+ public int spoutTaskId;
+ public Long failLatencyMs; // null if it wasn't sampled
+
+ public SpoutFailInfo(Object messageId, int spoutTaskId, Long failLatencyMs) {
+ this.messageId = messageId;
+ this.spoutTaskId = spoutTaskId;
+ this.failLatencyMs = failLatencyMs;
+ }
+
+ public void applyOn(TopologyContext topologyContext) {
+ for (ITaskHook hook : topologyContext.getHooks()) {
+ hook.spoutFail(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java b/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java
new file mode 100644
index 0000000..ba547ac
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/ConnectionWithStatus.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+public abstract class ConnectionWithStatus implements IConnection {
+
+ public static enum Status {
+
+ /**
+ * we are establishing a active connection with target host. The new data
+ * sending request can be buffered for future sending, or dropped(cases like
+ * there is no enough memory). It varies with difference IConnection
+ * implementations.
+ */
+ Connecting,
+
+ /**
+ * We have a alive connection channel, which can be used to transfer data.
+ */
+ Ready,
+
+ /**
+ * The connection channel is closed or being closed. We don't accept further
+ * data sending or receiving. All data sending request will be dropped.
+ */
+ Closed
+ }
+
+ /**
+ * whether this connection is available to transfer data
+ */
+ public abstract Status status();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
new file mode 100644
index 0000000..8f415c2
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import org.apache.storm.daemon.worker.WorkerState;
+import org.apache.storm.task.GeneralTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.serialization.KryoTupleDeserializer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public class DeserializingConnectionCallback implements IConnectionCallback {
+ private final WorkerState.ILocalTransferCallback _cb;
+ private final Map _conf;
+ private final GeneralTopologyContext _context;
+ private final ThreadLocal<KryoTupleDeserializer> _des =
+ new ThreadLocal<KryoTupleDeserializer>() {
+ @Override
+ protected KryoTupleDeserializer initialValue() {
+ return new KryoTupleDeserializer(_conf, _context);
+ }
+ };
+
+ public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
+ _conf = conf;
+ _context = context;
+ _cb = callback;
+ }
+
+ @Override
+ public void recv(List<TaskMessage> batch) {
+ KryoTupleDeserializer des = _des.get();
+ ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
+ for (TaskMessage message: batch) {
+ ret.add(new AddressedTuple(message.task(), des.deserialize(message.message())));
+ }
+ _cb.transfer(ret);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
new file mode 100644
index 0000000..7042dc3
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import org.apache.storm.grouping.Load;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
+public interface IConnection {
+ /**
+ * Register a callback to be notified when data is ready to be processed.
+ * @param cb the callback to process the messages.
+ */
+ public void registerRecv(IConnectionCallback cb);
+
+ /**
+ * Send load metrics to all downstream connections.
+ * @param taskToLoad a map from the task id to the load for that task.
+ */
+ public void sendLoadMetrics(Map<Integer, Double> taskToLoad);
+
+ /**
+ * send a message with taskId and payload
+ * @param taskId task ID
+ * @param payload
+ */
+ public void send(int taskId, byte[] payload);
+
+ /**
+ * send batch messages
+ * @param msgs
+ */
+
+ public void send(Iterator<TaskMessage> msgs);
+
+ /**
+ * Get the current load for the given tasks
+ * @param tasks the tasks to look for.
+ * @return a Load for each of the tasks it knows about.
+ */
+ public Map<Integer, Load> getLoad(Collection<Integer> tasks);
+
+ /**
+ * close this connection
+ */
+ public void close();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java b/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java
new file mode 100644
index 0000000..3224f2e
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IConnectionCallback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.util.List;
+
+/**
+ * A class that is called when a TaskMessage arrives.
+ */
+public interface IConnectionCallback {
+ /**
+ * A batch of new messages have arrived to be processed
+ * @param batch the messages to be processed
+ */
+ public void recv(List<TaskMessage> batch);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/IContext.java b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
new file mode 100644
index 0000000..72812b1
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/IContext.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.util.Map;
+
+/**
+ * This interface needs to be implemented for messaging plugin.
+ *
+ * Messaging plugin is specified via Storm config parameter, storm.messaging.transport.
+ *
+ * A messaging plugin should have a default constructor and implements IContext interface.
+ * Upon construction, we will invoke IContext::prepare(storm_conf) to enable context to be configured
+ * according to storm configuration.
+ */
+public interface IContext {
+ /**
+ * This method is invoked at the startup of messaging plugin
+ * @param storm_conf storm configuration
+ */
+ public void prepare(Map storm_conf);
+
+ /**
+ * This method is invoked when a worker is unload a messaging plugin
+ */
+ public void term();
+
+ /**
+ * This method establishes a server side connection
+ * @param storm_id topology ID
+ * @param port port #
+ * @return server side connection
+ */
+ public IConnection bind(String storm_id, int port);
+
+ /**
+ * This method establish a client side connection to a remote server
+ * @param storm_id topology ID
+ * @param host remote host
+ * @param port remote port
+ * @return client side connection
+ */
+ public IConnection connect(String storm_id, String host, int port);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java b/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java
new file mode 100644
index 0000000..98aba31
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/TaskMessage.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.nio.ByteBuffer;
+
+public class TaskMessage {
+ private int _task;
+ private byte[] _message;
+
+ public TaskMessage(int task, byte[] message) {
+ _task = task;
+ _message = message;
+ }
+
+ public int task() {
+ return _task;
+ }
+
+ public byte[] message() {
+ return _message;
+ }
+
+ public ByteBuffer serialize() {
+ ByteBuffer bb = ByteBuffer.allocate(_message.length+2);
+ bb.putShort((short)_task);
+ bb.put(_message);
+ return bb;
+ }
+
+ public void deserialize(ByteBuffer packet) {
+ if (packet==null) return;
+ _task = packet.getShort();
+ _message = new byte[packet.limit()-2];
+ packet.get(_message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
new file mode 100644
index 0000000..511257d
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/TransportFactory.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging;
+
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.reflect.Method;
+import org.apache.storm.Config;
+
+public class TransportFactory {
+ public static final Logger LOG = LoggerFactory.getLogger(TransportFactory.class);
+
+ public static IContext makeContext(Map storm_conf) {
+
+ //get factory class name
+ String transport_plugin_klassName = (String)storm_conf.get(Config.STORM_MESSAGING_TRANSPORT);
+ LOG.info("Storm peer transport plugin:"+transport_plugin_klassName);
+
+ IContext transport;
+ try {
+ //create a factory class
+ Class klass = Class.forName(transport_plugin_klassName);
+ //obtain a context object
+ Object obj = klass.newInstance();
+ if (obj instanceof IContext) {
+ //case 1: plugin is a IContext class
+ transport = (IContext)obj;
+ //initialize with storm configuration
+ transport.prepare(storm_conf);
+ } else {
+ //case 2: Non-IContext plugin must have a makeContext(storm_conf) method that returns IContext object
+ Method method = klass.getMethod("makeContext", Map.class);
+ LOG.debug("object:"+obj+" method:"+method);
+ transport = (IContext) method.invoke(obj, storm_conf);
+ }
+ } catch(Exception e) {
+ throw new RuntimeException("Fail to construct messaging plugin from plugin "+transport_plugin_klassName, e);
+ }
+ return transport;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
new file mode 100644
index 0000000..7300847
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/local/Context.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.local;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.storm.grouping.Load;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.messaging.IConnectionCallback;
+import org.apache.storm.messaging.IContext;
+
+public class Context implements IContext {
+ private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+
+ private static class LocalServer implements IConnection {
+ volatile IConnectionCallback _cb;
+ final ConcurrentHashMap<Integer, Double> _load = new ConcurrentHashMap<>();
+
+ @Override
+ public void registerRecv(IConnectionCallback cb) {
+ _cb = cb;
+ }
+
+ @Override
+ public void send(int taskId, byte[] payload) {
+ throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+ }
+
+ @Override
+ public void send(Iterator<TaskMessage> msgs) {
+ throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+ }
+
+ @Override
+ public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+ Map<Integer, Load> ret = new HashMap<>();
+ for (Integer task : tasks) {
+ Double found = _load.get(task);
+ if (found != null) {
+ ret.put(task, new Load(true, found, 0));
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+ _load.putAll(taskToLoad);
+ }
+
+ @Override
+ public void close() {
+ //NOOP
+ }
+ };
+
+ private static class LocalClient implements IConnection {
+ private final LocalServer _server;
+ //Messages sent before the server registered a callback
+ private final LinkedBlockingQueue<TaskMessage> _pendingDueToUnregisteredServer;
+ private final ScheduledExecutorService _pendingFlusher;
+
+ public LocalClient(LocalServer server) {
+ _server = server;
+ _pendingDueToUnregisteredServer = new LinkedBlockingQueue<>();
+ _pendingFlusher = Executors.newScheduledThreadPool(1, new ThreadFactory(){
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable);
+ thread.setName("LocalClientFlusher-" + thread.getId());
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
+ _pendingFlusher.scheduleAtFixedRate(new Runnable(){
+ @Override
+ public void run(){
+ try {
+ //Ensure messages are flushed even if no more sends are performed
+ flushPending();
+ } catch (Throwable t) {
+ LOG.error("Uncaught throwable in pending message flusher thread, messages may be lost", t);
+ throw new RuntimeException(t);
+ }
+ }
+ }, 5, 5, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public void registerRecv(IConnectionCallback cb) {
+ throw new IllegalArgumentException("SHOULD NOT HAPPEN");
+ }
+
+ private void flushPending(){
+ IConnectionCallback serverCb = _server._cb;
+ if (serverCb != null && !_pendingDueToUnregisteredServer.isEmpty()) {
+ ArrayList<TaskMessage> ret = new ArrayList<>();
+ _pendingDueToUnregisteredServer.drainTo(ret);
+ serverCb.recv(ret);
+ }
+ }
+
+ @Override
+ public void send(int taskId, byte[] payload) {
+ TaskMessage message = new TaskMessage(taskId, payload);
+ IConnectionCallback serverCb = _server._cb;
+ if (serverCb != null) {
+ flushPending();
+ serverCb.recv(Arrays.asList(message));
+ } else {
+ _pendingDueToUnregisteredServer.add(message);
+ }
+ }
+
+ @Override
+ public void send(Iterator<TaskMessage> msgs) {
+ IConnectionCallback serverCb = _server._cb;
+ if (serverCb != null) {
+ flushPending();
+ ArrayList<TaskMessage> ret = new ArrayList<>();
+ while (msgs.hasNext()) {
+ ret.add(msgs.next());
+ }
+ serverCb.recv(ret);
+ } else {
+ while(msgs.hasNext()){
+ _pendingDueToUnregisteredServer.add(msgs.next());
+ }
+ }
+ }
+
+ @Override
+ public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+ return _server.getLoad(tasks);
+ }
+
+ @Override
+ public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+ _server.sendLoadMetrics(taskToLoad);
+ }
+
+ @Override
+ public void close() {
+ _pendingFlusher.shutdown();
+ try{
+ _pendingFlusher.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e){
+ throw new RuntimeException("Interrupted while awaiting flusher shutdown", e);
+ }
+ }
+ };
+
+ private static ConcurrentHashMap<String, LocalServer> _registry = new ConcurrentHashMap<>();
+ private static LocalServer getLocalServer(String nodeId, int port) {
+ String key = nodeId + "-" + port;
+ LocalServer ret = _registry.get(key);
+ if (ret == null) {
+ ret = new LocalServer();
+ LocalServer tmp = _registry.putIfAbsent(key, ret);
+ if (tmp != null) {
+ ret = tmp;
+ }
+ }
+ return ret;
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void prepare(Map storm_conf) {
+ //NOOP
+ }
+
+ @Override
+ public IConnection bind(String storm_id, int port) {
+ return getLocalServer(storm_id, port);
+ }
+
+ @Override
+ public IConnection connect(String storm_id, String host, int port) {
+ return new LocalClient(getLocalServer(storm_id, port));
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void term() {
+ //NOOP
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
new file mode 100644
index 0000000..8463af6
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -0,0 +1,612 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.lang.InterruptedException;
+
+import org.apache.storm.Config;
+import org.apache.storm.grouping.Load;
+import org.apache.storm.messaging.ConnectionWithStatus;
+import org.apache.storm.messaging.TaskMessage;
+import org.apache.storm.messaging.IConnectionCallback;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A Netty client for sending task messages to a remote destination (Netty server).
+ *
+ * Implementation details:
+ *
+ * - Sending messages, i.e. writing to the channel, is performed asynchronously.
+ * - Messages are sent in batches to optimize for network throughput at the expense of network latency. The message
+ * batch size is configurable.
+ * - Connecting and reconnecting are performed asynchronously.
+ * - Note: The current implementation drops any messages that are being enqueued for sending if the connection to
+ * the remote destination is currently unavailable.
+ */
+public class Client extends ConnectionWithStatus implements IStatefulObject, ISaslClient {
+ private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
+ private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
+
+ private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+ private static final String PREFIX = "Netty-Client-";
+ private static final long NO_DELAY_MS = 0L;
+ private static final Timer timer = new Timer("Netty-ChannelAlive-Timer", true);
+
+ private final Map stormConf;
+ private final StormBoundedExponentialBackoffRetry retryPolicy;
+ private final ClientBootstrap bootstrap;
+ private final InetSocketAddress dstAddress;
+ protected final String dstAddressPrefixedName;
+ private volatile Map<Integer, Double> serverLoad = null;
+
+ /**
+ * The channel used for all write operations from this client to the remote destination.
+ */
+ private final AtomicReference<Channel> channelRef = new AtomicReference<>();
+
+ /**
+ * Total number of connection attempts.
+ */
+ private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0);
+
+ /**
+ * Number of connection attempts since the last disconnect.
+ */
+ private final AtomicInteger connectionAttempts = new AtomicInteger(0);
+
+ /**
+ * Number of messages successfully sent to the remote destination.
+ */
+ private final AtomicInteger messagesSent = new AtomicInteger(0);
+
+ /**
+ * Number of messages that could not be sent to the remote destination.
+ */
+ private final AtomicInteger messagesLost = new AtomicInteger(0);
+
+ /**
+ * Periodically checks for connected channel in order to avoid loss
+ * of messages
+ */
+ private final long CHANNEL_ALIVE_INTERVAL_MS = 30000L;
+
+ /**
+ * Number of messages buffered in memory.
+ */
+ private final AtomicLong pendingMessages = new AtomicLong(0);
+
+ /**
+ * Whether the SASL channel is ready.
+ */
+ private final AtomicBoolean saslChannelReady = new AtomicBoolean(false);
+
+ /**
+ * This flag is set to true if and only if a client instance is being closed.
+ */
+ private volatile boolean closing = false;
+
+ private final Context context;
+
+ private final HashedWheelTimer scheduler;
+
+ private final MessageBuffer batcher;
+
+ private final Object writeLock = new Object();
+
+ @SuppressWarnings("rawtypes")
+ Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host, int port, Context context) {
+ this.stormConf = stormConf;
+ closing = false;
+ this.scheduler = scheduler;
+ this.context = context;
+ int bufferSize = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+ // if SASL authentication is disabled, saslChannelReady is initialized as true; otherwise false
+ saslChannelReady.set(!ObjectReader.getBoolean(stormConf.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION), false));
+ LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize);
+ int messageBatchSize = ObjectReader.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+
+ int maxReconnectionAttempts = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
+ int minWaitMs = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+ int maxWaitMs = ObjectReader.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+ retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
+
+ // Initiate connection to remote destination
+ bootstrap = createClientBootstrap(factory, bufferSize, stormConf);
+ dstAddress = new InetSocketAddress(host, port);
+ dstAddressPrefixedName = prefixedName(dstAddress);
+ launchChannelAliveThread();
+ scheduleConnect(NO_DELAY_MS);
+ batcher = new MessageBuffer(messageBatchSize);
+ }
+
+ /**
+ * This thread helps us to check for channel connection periodically.
+ * This is performed just to know whether the destination address
+ * is alive or attempts to refresh connections if not alive. This
+ * solution is better than what we have now in case of a bad channel.
+ */
+ private void launchChannelAliveThread() {
+ // netty TimerTask is already defined and hence a fully
+ // qualified name
+ timer.schedule(new java.util.TimerTask() {
+ public void run() {
+ try {
+ LOG.debug("running timer task, address {}", dstAddress);
+ if(closing) {
+ this.cancel();
+ return;
+ }
+ getConnectedChannel();
+ } catch (Exception exp) {
+ LOG.error("channel connection error {}", exp);
+ }
+ }
+ }, 0, CHANNEL_ALIVE_INTERVAL_MS);
+ }
+
+ private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize, Map stormConf) {
+ ClientBootstrap bootstrap = new ClientBootstrap(factory);
+ bootstrap.setOption("tcpNoDelay", true);
+ bootstrap.setOption("sendBufferSize", bufferSize);
+ bootstrap.setOption("keepAlive", true);
+ bootstrap.setPipelineFactory(new StormClientPipelineFactory(this, stormConf));
+ return bootstrap;
+ }
+
+ private String prefixedName(InetSocketAddress dstAddress) {
+ if (null != dstAddress) {
+ return PREFIX + dstAddress.toString();
+ }
+ return "";
+ }
+
+ /**
+ * Enqueue a task message to be sent to server
+ */
+ private void scheduleConnect(long delayMs) {
+ scheduler.newTimeout(new Connect(dstAddress), delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private boolean reconnectingAllowed() {
+ return !closing;
+ }
+
+ private boolean connectionEstablished(Channel channel) {
+ // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully
+ // established iff the channel is connected. That is, a TCP-based channel must be in the CONNECTED state before
+ // anything can be read or written to the channel.
+ //
+ // See:
+ // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
+ // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
+ return channel != null && channel.isConnected();
+ }
+
+ /**
+ * Note: Storm will check via this method whether a worker can be activated safely during the initial startup of a
+ * topology. The worker will only be activated once all of the its connections are ready.
+ */
+ @Override
+ public Status status() {
+ if (closing) {
+ return Status.Closed;
+ } else if (!connectionEstablished(channelRef.get())) {
+ return Status.Connecting;
+ } else {
+ if (saslChannelReady.get()) {
+ return Status.Ready;
+ } else {
+ return Status.Connecting; // need to wait until sasl channel is also ready
+ }
+ }
+ }
+
+ /**
+ * Receiving messages is not supported by a client.
+ *
+ * @throws java.lang.UnsupportedOperationException whenever this method is being called.
+ */
+ @Override
+ public void registerRecv(IConnectionCallback cb) {
+ throw new UnsupportedOperationException("Client connection should not receive any messages");
+ }
+
+ @Override
+ public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+ throw new RuntimeException("Client connection should not send load metrics");
+ }
+
+ @Override
+ public void send(int taskId, byte[] payload) {
+ TaskMessage msg = new TaskMessage(taskId, payload);
+ List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+ wrapper.add(msg);
+ send(wrapper.iterator());
+ }
+
+ /**
+ * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
+ */
+ @Override
+ public void send(Iterator<TaskMessage> msgs) {
+ if (closing) {
+ int numMessages = iteratorSize(msgs);
+ LOG.error("discarding {} messages because the Netty client to {} is being closed", numMessages,
+ dstAddressPrefixedName);
+ return;
+ }
+
+ if (!hasMessages(msgs)) {
+ return;
+ }
+
+ Channel channel = getConnectedChannel();
+ if (channel == null) {
+ /*
+ * Connection is unavailable. We will drop pending messages and let at-least-once message replay kick in.
+ *
+ * Another option would be to buffer the messages in memory. But this option has the risk of causing OOM errors,
+ * especially for topologies that disable message acking because we don't know whether the connection recovery will
+ * succeed or not, and how long the recovery will take.
+ */
+ dropMessages(msgs);
+ return;
+ }
+
+ synchronized (writeLock) {
+ while (msgs.hasNext()) {
+ TaskMessage message = msgs.next();
+ MessageBatch full = batcher.add(message);
+ if(full != null){
+ flushMessages(channel, full);
+ }
+ }
+ }
+
+ if(channel.isWritable()){
+ synchronized (writeLock) {
+ // Netty's internal buffer is not full and we still have message left in the buffer.
+ // We should write the unfilled MessageBatch immediately to reduce latency
+ MessageBatch batch = batcher.drain();
+ if(batch != null) {
+ flushMessages(channel, batch);
+ }
+ }
+ } else {
+ // Channel's buffer is full, meaning that we have time to wait other messages to arrive, and create a bigger
+ // batch. This yields better throughput.
+ // We can rely on `notifyInterestChanged` to push these messages as soon as there is spece in Netty's buffer
+ // because we know `Channel.isWritable` was false after the messages were already in the buffer.
+ }
+ }
+
+ private Channel getConnectedChannel() {
+ Channel channel = channelRef.get();
+ if (connectionEstablished(channel)) {
+ return channel;
+ } else {
+ // Closing the channel and reconnecting should be done before handling the messages.
+ boolean reconnectScheduled = closeChannelAndReconnect(channel);
+ if (reconnectScheduled) {
+ // Log the connection error only once
+ LOG.error("connection to {} is unavailable", dstAddressPrefixedName);
+ }
+ return null;
+ }
+ }
+
+ public InetSocketAddress getDstAddress() {
+ return dstAddress;
+ }
+
+ private boolean hasMessages(Iterator<TaskMessage> msgs) {
+ return msgs != null && msgs.hasNext();
+ }
+
+ private void dropMessages(Iterator<TaskMessage> msgs) {
+ // We consume the iterator by traversing and thus "emptying" it.
+ int msgCount = iteratorSize(msgs);
+ messagesLost.getAndAdd(msgCount);
+ }
+
+ private int iteratorSize(Iterator<TaskMessage> msgs) {
+ int size = 0;
+ if (msgs != null) {
+ while (msgs.hasNext()) {
+ size++;
+ msgs.next();
+ }
+ }
+ return size;
+ }
+
+ /**
+ * Asynchronously writes the message batch to the channel.
+ *
+ * If the write operation fails, then we will close the channel and trigger a reconnect.
+ */
+ private void flushMessages(Channel channel, final MessageBatch batch) {
+ if (null == batch || batch.isEmpty()) {
+ return;
+ }
+
+ final int numMessages = batch.size();
+ LOG.debug("writing {} messages to channel {}", batch.size(), channel.toString());
+ pendingMessages.addAndGet(numMessages);
+
+ ChannelFuture future = channel.write(batch);
+ future.addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) throws Exception {
+ pendingMessages.addAndGet(0 - numMessages);
+ if (future.isSuccess()) {
+ LOG.debug("sent {} messages to {}", numMessages, dstAddressPrefixedName);
+ messagesSent.getAndAdd(batch.size());
+ } else {
+ LOG.error("failed to send {} messages to {}: {}", numMessages, dstAddressPrefixedName,
+ future.getCause());
+ closeChannelAndReconnect(future.getChannel());
+ messagesLost.getAndAdd(numMessages);
+ }
+ }
+
+ });
+ }
+
+ /**
+ * Schedule a reconnect if we closed a non-null channel, and acquired the right to
+ * provide a replacement by successfully setting a null to the channel field
+ * @param channel
+ * @return if the call scheduled a re-connect task
+ */
+ private boolean closeChannelAndReconnect(Channel channel) {
+ if (channel != null) {
+ channel.close();
+ if (channelRef.compareAndSet(channel, null)) {
+ scheduleConnect(NO_DELAY_MS);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Gracefully close this client.
+ */
+ @Override
+ public void close() {
+ if (!closing) {
+ LOG.info("closing Netty Client {}", dstAddressPrefixedName);
+ context.removeClient(dstAddress.getHostName(),dstAddress.getPort());
+ // Set closing to true to prevent any further reconnection attempts.
+ closing = true;
+ waitForPendingMessagesToBeSent();
+ closeChannel();
+ }
+ }
+
+ private void waitForPendingMessagesToBeSent() {
+ LOG.info("waiting up to {} ms to send {} pending messages to {}",
+ PENDING_MESSAGES_FLUSH_TIMEOUT_MS, pendingMessages.get(), dstAddressPrefixedName);
+ long totalPendingMsgs = pendingMessages.get();
+ long startMs = System.currentTimeMillis();
+ while (pendingMessages.get() != 0) {
+ try {
+ long deltaMs = System.currentTimeMillis() - startMs;
+ if (deltaMs > PENDING_MESSAGES_FLUSH_TIMEOUT_MS) {
+ LOG.error("failed to send all pending messages to {} within timeout, {} of {} messages were not " +
+ "sent", dstAddressPrefixedName, pendingMessages.get(), totalPendingMsgs);
+ break;
+ }
+ Thread.sleep(PENDING_MESSAGES_FLUSH_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ }
+
+ private void closeChannel() {
+ Channel channel = channelRef.get();
+ if (channel != null) {
+ channel.close();
+ LOG.debug("channel to {} closed", dstAddressPrefixedName);
+ }
+ }
+
+ void setLoadMetrics(Map<Integer, Double> taskToLoad) {
+ this.serverLoad = taskToLoad;
+ }
+
+ @Override
+ public Map<Integer, Load> getLoad(Collection<Integer> tasks) {
+ Map<Integer, Double> loadCache = serverLoad;
+ Map<Integer, Load> ret = new HashMap<Integer, Load>();
+ if (loadCache != null) {
+ double clientLoad = Math.min(pendingMessages.get(), 1024)/1024.0;
+ for (Integer task : tasks) {
+ Double found = loadCache.get(task);
+ if (found != null) {
+ ret.put(task, new Load(true, found, clientLoad));
+ }
+ }
+ }
+ return ret;
+ }
+
+ @Override
+ public Object getState() {
+ LOG.debug("Getting metrics for client connection to {}", dstAddressPrefixedName);
+ HashMap<String, Object> ret = new HashMap<String, Object>();
+ ret.put("reconnects", totalConnectionAttempts.getAndSet(0));
+ ret.put("sent", messagesSent.getAndSet(0));
+ ret.put("pending", pendingMessages.get());
+ ret.put("lostOnSend", messagesLost.getAndSet(0));
+ ret.put("dest", dstAddress.toString());
+ String src = srcAddressName();
+ if (src != null) {
+ ret.put("src", src);
+ }
+ return ret;
+ }
+
+ public Map getConfig() {
+ return stormConf;
+ }
+
+ /** ISaslClient interface **/
+ public void channelConnected(Channel channel) {
+// setChannel(channel);
+ }
+
+ public void channelReady() {
+ saslChannelReady.set(true);
+ }
+
+ public String name() {
+ return (String)stormConf.get(Config.TOPOLOGY_NAME);
+ }
+
+ public String secretKey() {
+ return SaslUtils.getSecretKey(stormConf);
+ }
+ /** end **/
+
+ private String srcAddressName() {
+ String name = null;
+ Channel channel = channelRef.get();
+ if (channel != null) {
+ SocketAddress address = channel.getLocalAddress();
+ if (address != null) {
+ name = address.toString();
+ }
+ }
+ return name;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Netty client for connecting to %s", dstAddressPrefixedName);
+ }
+
+ /**
+ * Called by Netty thread on change in channel interest
+ * @param channel
+ */
+ public void notifyInterestChanged(Channel channel) {
+ if(channel.isWritable()){
+ synchronized (writeLock) {
+ // Channel is writable again, write if there are any messages pending
+ MessageBatch pending = batcher.drain();
+ flushMessages(channel, pending);
+ }
+ }
+ }
+
+ /**
+ * Asynchronously establishes a Netty connection to the remote address
+ * This task runs on a single thread shared among all clients, and thus
+ * should not perform operations that block.
+ */
+ private class Connect implements TimerTask {
+
+ private final InetSocketAddress address;
+
+ public Connect(InetSocketAddress address) {
+ this.address = address;
+ }
+
+ private void reschedule(Throwable t) {
+ String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts,
+ dstAddressPrefixedName);
+ String failureMsg = (t == null) ? baseMsg : baseMsg + ": " + t.toString();
+ LOG.error(failureMsg);
+ long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
+ scheduleConnect(nextDelayMs);
+ }
+
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (reconnectingAllowed()) {
+ final int connectionAttempt = connectionAttempts.getAndIncrement();
+ totalConnectionAttempts.getAndIncrement();
+
+ LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+ ChannelFuture future = bootstrap.connect(address);
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ // This call returns immediately
+ Channel newChannel = future.getChannel();
+
+ if (future.isSuccess() && connectionEstablished(newChannel)) {
+ boolean setChannel = channelRef.compareAndSet(null, newChannel);
+ checkState(setChannel);
+ LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(), newChannel.toString(),
+ connectionAttempt);
+ if (messagesLost.get() > 0) {
+ LOG.warn("Re-connection to {} was successful but {} messages has been lost so far", address.toString(), messagesLost.get());
+ }
+ } else {
+ Throwable cause = future.getCause();
+ reschedule(cause);
+ if (newChannel != null) {
+ newChannel.close();
+ }
+ }
+ }
+ });
+ } else {
+ close();
+ throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName + " after " +
+ connectionAttempts + " failed attempts. " + messagesLost.get() + " messages were lost");
+
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
new file mode 100644
index 0000000..0f7c66b
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Context.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.apache.storm.utils.ObjectReader;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.messaging.IConnection;
+import org.apache.storm.messaging.IContext;
+
+public class Context implements IContext {
+ @SuppressWarnings("rawtypes")
+ private Map storm_conf;
+ private Map<String, IConnection> connections;
+ private NioClientSocketChannelFactory clientChannelFactory;
+
+ private HashedWheelTimer clientScheduleService;
+
+ /**
+ * initialization per Storm configuration
+ */
+ @SuppressWarnings("rawtypes")
+ public void prepare(Map storm_conf) {
+ this.storm_conf = storm_conf;
+ connections = new HashMap<>();
+
+ //each context will have a single client channel factory
+ int maxWorkers = ObjectReader.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+ ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+ ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
+ if (maxWorkers > 0) {
+ clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory), maxWorkers);
+ } else {
+ clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+ Executors.newCachedThreadPool(workerFactory));
+ }
+
+ clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-service"));
+ }
+
+ /**
+ * establish a server with a binding port
+ */
+ public synchronized IConnection bind(String storm_id, int port) {
+ IConnection server = new Server(storm_conf, port);
+ connections.put(key(storm_id, port), server);
+ return server;
+ }
+
+ /**
+ * establish a connection to a remote server
+ */
+ public synchronized IConnection connect(String storm_id, String host, int port) {
+ IConnection connection = connections.get(key(host,port));
+ if(connection !=null)
+ {
+ return connection;
+ }
+ IConnection client = new Client(storm_conf, clientChannelFactory,
+ clientScheduleService, host, port, this);
+ connections.put(key(host, port), client);
+ return client;
+ }
+
+ synchronized void removeClient(String host, int port) {
+ if (connections != null) {
+ connections.remove(key(host, port));
+ }
+ }
+
+ /**
+ * terminate this context
+ */
+ public synchronized void term() {
+ clientScheduleService.stop();
+
+ for (IConnection conn : connections.values()) {
+ conn.close();
+ }
+
+ connections = null;
+
+ //we need to release resources associated with client channel factory
+ clientChannelFactory.releaseExternalResources();
+
+ }
+
+ private String key(String host, int port) {
+ return String.format("%s:%d", host, port);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
new file mode 100644
index 0000000..3c7aaba
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ControlMessage.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+public enum ControlMessage implements INettySerializable {
+ CLOSE_MESSAGE((short)-100),
+ EOB_MESSAGE((short)-201),
+ OK_RESPONSE((short)-200),
+ FAILURE_RESPONSE((short)-400),
+ SASL_TOKEN_MESSAGE_REQUEST((short)-202),
+ SASL_COMPLETE_REQUEST((short)-203);
+
+ private short code;
+
+ //private constructor
+ private ControlMessage(short code) {
+ this.code = code;
+ }
+
+ /**
+ * @param encoded status code
+ * @return a control message per an encoded status code
+ */
+ public static ControlMessage mkMessage(short encoded) {
+ for(ControlMessage cm: ControlMessage.values()) {
+ if(encoded == cm.code) return cm;
+ }
+ return null;
+ }
+
+ public int encodeLength() {
+ return 2; //short
+ }
+
+ /**
+ * encode the current Control Message into a channel buffer
+ * @throws IOException
+ */
+ public ChannelBuffer buffer() throws IOException {
+ ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));
+ write(bout);
+ bout.close();
+ return bout.buffer();
+ }
+
+ public static ControlMessage read(byte[] serial) {
+ ChannelBuffer cm_buffer = ChannelBuffers.copiedBuffer(serial);
+ return mkMessage(cm_buffer.getShort(0));
+ }
+
+ public void write(ChannelBufferOutputStream bout) throws IOException {
+ bout.writeShort(code);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
new file mode 100644
index 0000000..0a0236f
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/INettySerializable.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import org.jboss.netty.buffer.ChannelBuffer;
+
+public interface INettySerializable {
+ ChannelBuffer buffer() throws IOException;
+ int encodeLength();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
new file mode 100644
index 0000000..681c199
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslClient.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.apache.storm.Config;
+
+public interface ISaslClient {
+ void channelConnected(Channel channel);
+ void channelReady();
+ String name();
+ String secretKey();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
new file mode 100644
index 0000000..997dbeb
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/ISaslServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface ISaslServer extends IServer {
+ String name();
+ String secretKey();
+ void authenticated(Channel c);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/4de339a8/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
new file mode 100644
index 0000000..b04d715
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/IServer.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+
+public interface IServer {
+ void channelConnected(Channel c);
+ void received(Object message, String remote, Channel channel) throws InterruptedException;
+ void closeChannel(Channel c);
+}