You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/07/13 23:03:18 UTC

[1/4] storm git commit: STORM-3148: Avoid threading issues with kryo

Repository: storm
Updated Branches:
  refs/heads/master e21110d33 -> c9e9a7c29


STORM-3148: Avoid threading issues with kryo


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/911a0d70
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/911a0d70
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/911a0d70

Branch: refs/heads/master
Commit: 911a0d7006ca79ec41d9a5692fbd417f1931cbb4
Parents: efb2e9a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jul 12 15:58:20 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jul 12 15:58:20 2018 -0500

----------------------------------------------------------------------
 .../apache/storm/messaging/netty/Client.java    |  4 ---
 .../apache/storm/messaging/netty/Server.java    |  8 ++---
 .../netty/StormClientPipelineFactory.java       |  3 +-
 .../netty/StormServerPipelineFactory.java       | 11 ++----
 .../messaging/netty/BackPressureStatusTest.java | 35 ++++++++++++++++++++
 5 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index d46d785..fe2fe16 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -118,8 +118,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
     private final MessageBuffer batcher;
     // wait strategy when the netty channel is not writable
     private final IWaitStrategy waitStrategy;
-    KryoValuesSerializer ser;
-    KryoValuesDeserializer deser;
     private volatile Map<Integer, Double> serverLoad = null;
     /**
      * This flag is set to true if and only if a client instance is being closed.
@@ -169,8 +167,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
             waitStrategy = ReflectionUtils.newInstance(clazz);
         }
         waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
-        ser = new KryoValuesSerializer(topoConf);
-        deser = new KryoValuesDeserializer(topoConf);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 6500abe..7d150c3 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -62,7 +62,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     private final int port;
     private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
     private final KryoValuesSerializer ser;
-    private final KryoValuesDeserializer deser;
     private volatile boolean closing = false;
     private IConnectionCallback cb = null;
     private Supplier<Object> newConnectionResponse;
@@ -71,7 +70,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
         this.topoConf = topoConf;
         this.port = port;
         ser = new KryoValuesSerializer(topoConf);
-        deser = new KryoValuesDeserializer(topoConf);
 
         // Configure the server.
         int buffer_size = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -97,7 +95,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
             .childOption(ChannelOption.SO_RCVBUF, buffer_size)
             .childOption(ChannelOption.SO_KEEPALIVE, true)
             .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-            .childHandler(new StormServerPipelineFactory(ser, deser, topoConf, this));
+            .childHandler(new StormServerPipelineFactory(topoConf, this));
 
         // Bind and start to accept incoming connections.
         try {
@@ -171,7 +169,9 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
     @Override
     public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
         MessageBatch mb = new MessageBatch(1);
-        mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
+        synchronized (ser) {
+            mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
+        }
         allChannels.writeAndFlush(mb);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
index 2a790ef..0c7bd0f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
@@ -15,6 +15,7 @@ package org.apache.storm.messaging.netty;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.storm.Config;
+import org.apache.storm.serialization.KryoValuesDeserializer;
 import org.apache.storm.shade.io.netty.channel.Channel;
 import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
 import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
@@ -36,7 +37,7 @@ class StormClientPipelineFactory extends ChannelInitializer<Channel> {
         ChannelPipeline pipeline = ch.pipeline();
 
         // Decoder
-        pipeline.addLast("decoder", new MessageDecoder(client.deser));
+        pipeline.addLast("decoder", new MessageDecoder(new KryoValuesDeserializer(conf)));
         // Encoder
         pipeline.addLast("encoder", NettySerializableMessageEncoder.INSTANCE);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
index 6e22fb7..1904f47 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
@@ -23,15 +23,10 @@ import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
 
 class StormServerPipelineFactory extends ChannelInitializer<Channel> {
 
-    private final KryoValuesSerializer ser;
-    private final KryoValuesDeserializer deser;
     private final Map<String, Object> topoConf;
     private final Server server;
 
-    StormServerPipelineFactory(KryoValuesSerializer ser, KryoValuesDeserializer deser,
-        Map<String, Object> topoConf, Server server) {
-        this.ser = ser;
-        this.deser = deser;
+    StormServerPipelineFactory(Map<String, Object> topoConf, Server server) {
         this.topoConf = topoConf;
         this.server = server;
     }
@@ -42,10 +37,10 @@ class StormServerPipelineFactory extends ChannelInitializer<Channel> {
         ChannelPipeline pipeline = ch.pipeline();
 
         // Decoder
-        pipeline.addLast("decoder", new MessageDecoder(deser));
+        pipeline.addLast("decoder", new MessageDecoder(new KryoValuesDeserializer(topoConf)));
         // Encoders
         pipeline.addLast("netty-serializable-encoder", NettySerializableMessageEncoder.INSTANCE);
-        pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(ser));
+        pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(new KryoValuesSerializer(topoConf)));
 
         boolean isNettyAuth = (Boolean) topoConf
             .get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);

http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java b/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
new file mode 100644
index 0000000..ed6c59c
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class BackPressureStatusTest {
+
+    @Test
+    void bufferTest() throws IOException {
+        UnpooledByteBufAllocator alloc = new UnpooledByteBufAllocator(false);
+        KryoValuesSerializer ser = new KryoValuesSerializer(Utils.readStormConfig());
+
+        BackPressureStatus status = new BackPressureStatus("test-worker", Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6));
+        status.buffer(alloc, ser).release();
+    }
+}
\ No newline at end of file


[4/4] storm git commit: Merge branch 'STORM-3148' of https://github.com/revans2/incubator-storm into STORM-3148

Posted by bo...@apache.org.
Merge branch 'STORM-3148' of https://github.com/revans2/incubator-storm into STORM-3148

STORM-3148: Avoid threading issues with kryo

This closes #2762


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9e9a7c2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9e9a7c2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9e9a7c2

Branch: refs/heads/master
Commit: c9e9a7c294458c8bb1166e0646a5fa580661e21e
Parents: e21110d bfb267e
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 13 15:42:53 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 13 15:42:53 2018 -0500

----------------------------------------------------------------------
 .../storm/daemon/worker/BackPressureTracker.java    | 13 ++++++++-----
 .../org/apache/storm/messaging/netty/Client.java    |  4 ----
 .../org/apache/storm/messaging/netty/Server.java    |  8 ++++----
 .../storm/messaging/netty/StormClientHandler.java   | 16 ++++++++++++++--
 .../messaging/netty/StormClientPipelineFactory.java |  3 ++-
 .../messaging/netty/StormServerPipelineFactory.java | 11 +++--------
 6 files changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------



[3/4] storm git commit: STORM-3148: Addressed review comments

Posted by bo...@apache.org.
STORM-3148: Addressed review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfb267e8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfb267e8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfb267e8

Branch: refs/heads/master
Commit: bfb267e8025092e72ad0d586e50e1182b54f3242
Parents: 6301756
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 13 08:09:02 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 13 08:09:02 2018 -0500

----------------------------------------------------------------------
 .../messaging/netty/StormClientHandler.java     |  4 +++
 .../messaging/netty/BackPressureStatusTest.java | 35 --------------------
 2 files changed, 4 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bfb267e8/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index a903264..1661fc3 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -50,6 +50,8 @@ public class StormClientHandler extends ChannelInboundHandlerAdapter {
                     try {
                         remoteBpStatus[bpTask].set(true);
                     } catch (ArrayIndexOutOfBoundsException e) {
+                        //Just in case we get something we are confused about
+                        // we can continue processing the rest of the tasks
                         LOG.error("BP index out of bounds {}", e);
                     }
                 }
@@ -59,6 +61,8 @@ public class StormClientHandler extends ChannelInboundHandlerAdapter {
                     try {
                         remoteBpStatus[bpTask].set(false);
                     } catch (ArrayIndexOutOfBoundsException e) {
+                        //Just in case we get something we are confused about
+                        // we can continue processing the rest of the tasks
                         LOG.error("BP index out of bounds {}", e);
                     }
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/bfb267e8/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java b/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
deleted file mode 100644
index ed6c59c..0000000
--- a/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.messaging.netty;
-
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.io.netty.buffer.ByteBuf;
-import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator;
-import org.apache.storm.utils.Utils;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-class BackPressureStatusTest {
-
-    @Test
-    void bufferTest() throws IOException {
-        UnpooledByteBufAllocator alloc = new UnpooledByteBufAllocator(false);
-        KryoValuesSerializer ser = new KryoValuesSerializer(Utils.readStormConfig());
-
-        BackPressureStatus status = new BackPressureStatus("test-worker", Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6));
-        status.buffer(alloc, ser).release();
-    }
-}
\ No newline at end of file


[2/4] storm git commit: STORM-3148: Fix backpressure issue with system bolt

Posted by bo...@apache.org.
STORM-3148:  Fix backpressure issue with system bolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63017568
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63017568
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63017568

Branch: refs/heads/master
Commit: 63017568eb543571804d0475c975a9c5382465f9
Parents: 911a0d7
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jul 12 16:49:04 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jul 12 16:49:04 2018 -0500

----------------------------------------------------------------------
 .../storm/daemon/worker/BackPressureTracker.java       | 13 ++++++++-----
 .../storm/messaging/netty/StormClientHandler.java      | 12 ++++++++++--
 2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/63017568/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index a4e87ba..7e98658 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -81,11 +81,14 @@ public class BackPressureTracker {
         ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
 
         for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
-            boolean backpressure = entry.getValue().backpressure.get();
-            if (backpressure) {
-                bpTasks.add(entry.getKey());
-            } else {
-                nonBpTasks.add(entry.getKey());
+            //System bolt is not a part of backpressure.
+            if (entry.getKey() >= 0) {
+                boolean backpressure = entry.getValue().backpressure.get();
+                if (backpressure) {
+                    bpTasks.add(entry.getKey());
+                } else {
+                    nonBpTasks.add(entry.getKey());
+                }
             }
         }
         return new BackPressureStatus(workerId, bpTasks, nonBpTasks);

http://git-wip-us.apache.org/repos/asf/storm/blob/63017568/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index 4c38344..a903264 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -47,12 +47,20 @@ public class StormClientHandler extends ChannelInboundHandlerAdapter {
             BackPressureStatus status = (BackPressureStatus) message;
             if (status.bpTasks != null) {
                 for (Integer bpTask : status.bpTasks) {
-                    remoteBpStatus[bpTask].set(true);
+                    try {
+                        remoteBpStatus[bpTask].set(true);
+                    } catch (ArrayIndexOutOfBoundsException e) {
+                        LOG.error("BP index out of bounds {}", e);
+                    }
                 }
             }
             if (status.nonBpTasks != null) {
                 for (Integer bpTask : status.nonBpTasks) {
-                    remoteBpStatus[bpTask].set(false);
+                    try {
+                        remoteBpStatus[bpTask].set(false);
+                    } catch (ArrayIndexOutOfBoundsException e) {
+                        LOG.error("BP index out of bounds {}", e);
+                    }
                 }
             }
             LOG.debug("Received BackPressure status update : {}", status);