You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/07/13 23:03:18 UTC
[1/4] storm git commit: STORM-3148: Avoid threading issues with kryo
Repository: storm
Updated Branches:
refs/heads/master e21110d33 -> c9e9a7c29
STORM-3148: Avoid threading issues with kryo
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/911a0d70
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/911a0d70
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/911a0d70
Branch: refs/heads/master
Commit: 911a0d7006ca79ec41d9a5692fbd417f1931cbb4
Parents: efb2e9a
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jul 12 15:58:20 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jul 12 15:58:20 2018 -0500
----------------------------------------------------------------------
.../apache/storm/messaging/netty/Client.java | 4 ---
.../apache/storm/messaging/netty/Server.java | 8 ++---
.../netty/StormClientPipelineFactory.java | 3 +-
.../netty/StormServerPipelineFactory.java | 11 ++----
.../messaging/netty/BackPressureStatusTest.java | 35 ++++++++++++++++++++
5 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index d46d785..fe2fe16 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -118,8 +118,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
private final MessageBuffer batcher;
// wait strategy when the netty channel is not writable
private final IWaitStrategy waitStrategy;
- KryoValuesSerializer ser;
- KryoValuesDeserializer deser;
private volatile Map<Integer, Double> serverLoad = null;
/**
* This flag is set to true if and only if a client instance is being closed.
@@ -169,8 +167,6 @@ public class Client extends ConnectionWithStatus implements IStatefulObject, ISa
waitStrategy = ReflectionUtils.newInstance(clazz);
}
waitStrategy.prepare(topoConf, WAIT_SITUATION.BACK_PRESSURE_WAIT);
- ser = new KryoValuesSerializer(topoConf);
- deser = new KryoValuesDeserializer(topoConf);
}
/**
http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index 6500abe..7d150c3 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -62,7 +62,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
private final int port;
private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
private final KryoValuesSerializer ser;
- private final KryoValuesDeserializer deser;
private volatile boolean closing = false;
private IConnectionCallback cb = null;
private Supplier<Object> newConnectionResponse;
@@ -71,7 +70,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
this.topoConf = topoConf;
this.port = port;
ser = new KryoValuesSerializer(topoConf);
- deser = new KryoValuesDeserializer(topoConf);
// Configure the server.
int buffer_size = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -97,7 +95,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
.childOption(ChannelOption.SO_RCVBUF, buffer_size)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
- .childHandler(new StormServerPipelineFactory(ser, deser, topoConf, this));
+ .childHandler(new StormServerPipelineFactory(topoConf, this));
// Bind and start to accept incoming connections.
try {
@@ -171,7 +169,9 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
@Override
public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
MessageBatch mb = new MessageBatch(1);
- mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
+ synchronized (ser) {
+ mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
+ }
allChannels.writeAndFlush(mb);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
index 2a790ef..0c7bd0f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
@@ -15,6 +15,7 @@ package org.apache.storm.messaging.netty;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
+import org.apache.storm.serialization.KryoValuesDeserializer;
import org.apache.storm.shade.io.netty.channel.Channel;
import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
@@ -36,7 +37,7 @@ class StormClientPipelineFactory extends ChannelInitializer<Channel> {
ChannelPipeline pipeline = ch.pipeline();
// Decoder
- pipeline.addLast("decoder", new MessageDecoder(client.deser));
+ pipeline.addLast("decoder", new MessageDecoder(new KryoValuesDeserializer(conf)));
// Encoder
pipeline.addLast("encoder", NettySerializableMessageEncoder.INSTANCE);
http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
index 6e22fb7..1904f47 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
@@ -23,15 +23,10 @@ import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
class StormServerPipelineFactory extends ChannelInitializer<Channel> {
- private final KryoValuesSerializer ser;
- private final KryoValuesDeserializer deser;
private final Map<String, Object> topoConf;
private final Server server;
- StormServerPipelineFactory(KryoValuesSerializer ser, KryoValuesDeserializer deser,
- Map<String, Object> topoConf, Server server) {
- this.ser = ser;
- this.deser = deser;
+ StormServerPipelineFactory(Map<String, Object> topoConf, Server server) {
this.topoConf = topoConf;
this.server = server;
}
@@ -42,10 +37,10 @@ class StormServerPipelineFactory extends ChannelInitializer<Channel> {
ChannelPipeline pipeline = ch.pipeline();
// Decoder
- pipeline.addLast("decoder", new MessageDecoder(deser));
+ pipeline.addLast("decoder", new MessageDecoder(new KryoValuesDeserializer(topoConf)));
// Encoders
pipeline.addLast("netty-serializable-encoder", NettySerializableMessageEncoder.INSTANCE);
- pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(ser));
+ pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(new KryoValuesSerializer(topoConf)));
boolean isNettyAuth = (Boolean) topoConf
.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
http://git-wip-us.apache.org/repos/asf/storm/blob/911a0d70/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java b/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
new file mode 100644
index 0000000..ed6c59c
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.messaging.netty;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.storm.utils.Utils;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class BackPressureStatusTest {
+
+ @Test
+ void bufferTest() throws IOException {
+ UnpooledByteBufAllocator alloc = new UnpooledByteBufAllocator(false);
+ KryoValuesSerializer ser = new KryoValuesSerializer(Utils.readStormConfig());
+
+ BackPressureStatus status = new BackPressureStatus("test-worker", Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6));
+ status.buffer(alloc, ser).release();
+ }
+}
\ No newline at end of file
[4/4] storm git commit: Merge branch 'STORM-3148' of
https://github.com/revans2/incubator-storm into STORM-3148
Posted by bo...@apache.org.
Merge branch 'STORM-3148' of https://github.com/revans2/incubator-storm into STORM-3148
STORM-3148: Avoid threading issues with kryo
This closes #2762
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c9e9a7c2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c9e9a7c2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c9e9a7c2
Branch: refs/heads/master
Commit: c9e9a7c294458c8bb1166e0646a5fa580661e21e
Parents: e21110d bfb267e
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 13 15:42:53 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 13 15:42:53 2018 -0500
----------------------------------------------------------------------
.../storm/daemon/worker/BackPressureTracker.java | 13 ++++++++-----
.../org/apache/storm/messaging/netty/Client.java | 4 ----
.../org/apache/storm/messaging/netty/Server.java | 8 ++++----
.../storm/messaging/netty/StormClientHandler.java | 16 ++++++++++++++--
.../messaging/netty/StormClientPipelineFactory.java | 3 ++-
.../messaging/netty/StormServerPipelineFactory.java | 11 +++--------
6 files changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
[3/4] storm git commit: STORM-3148: Addressed review comments
Posted by bo...@apache.org.
STORM-3148: Addressed review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bfb267e8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bfb267e8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bfb267e8
Branch: refs/heads/master
Commit: bfb267e8025092e72ad0d586e50e1182b54f3242
Parents: 6301756
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Fri Jul 13 08:09:02 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Fri Jul 13 08:09:02 2018 -0500
----------------------------------------------------------------------
.../messaging/netty/StormClientHandler.java | 4 +++
.../messaging/netty/BackPressureStatusTest.java | 35 --------------------
2 files changed, 4 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bfb267e8/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index a903264..1661fc3 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -50,6 +50,8 @@ public class StormClientHandler extends ChannelInboundHandlerAdapter {
try {
remoteBpStatus[bpTask].set(true);
} catch (ArrayIndexOutOfBoundsException e) {
+ //Just in case we get something we are confused about
+ // we can continue processing the rest of the tasks
LOG.error("BP index out of bounds {}", e);
}
}
@@ -59,6 +61,8 @@ public class StormClientHandler extends ChannelInboundHandlerAdapter {
try {
remoteBpStatus[bpTask].set(false);
} catch (ArrayIndexOutOfBoundsException e) {
+ //Just in case we get something we are confused about
+ // we can continue processing the rest of the tasks
LOG.error("BP index out of bounds {}", e);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/bfb267e8/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java b/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
deleted file mode 100644
index ed6c59c..0000000
--- a/storm-client/test/jvm/org/apache/storm/messaging/netty/BackPressureStatusTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.messaging.netty;
-
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.io.netty.buffer.ByteBuf;
-import org.apache.storm.shade.io.netty.buffer.UnpooledByteBufAllocator;
-import org.apache.storm.utils.Utils;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.*;
-
-class BackPressureStatusTest {
-
- @Test
- void bufferTest() throws IOException {
- UnpooledByteBufAllocator alloc = new UnpooledByteBufAllocator(false);
- KryoValuesSerializer ser = new KryoValuesSerializer(Utils.readStormConfig());
-
- BackPressureStatus status = new BackPressureStatus("test-worker", Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6));
- status.buffer(alloc, ser).release();
- }
-}
\ No newline at end of file
[2/4] storm git commit: STORM-3148: Fix backpressure issue with
system bolt
Posted by bo...@apache.org.
STORM-3148: Fix backpressure issue with system bolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63017568
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63017568
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63017568
Branch: refs/heads/master
Commit: 63017568eb543571804d0475c975a9c5382465f9
Parents: 911a0d7
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Jul 12 16:49:04 2018 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Jul 12 16:49:04 2018 -0500
----------------------------------------------------------------------
.../storm/daemon/worker/BackPressureTracker.java | 13 ++++++++-----
.../storm/messaging/netty/StormClientHandler.java | 12 ++++++++++--
2 files changed, 18 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/63017568/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index a4e87ba..7e98658 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -81,11 +81,14 @@ public class BackPressureTracker {
ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
- boolean backpressure = entry.getValue().backpressure.get();
- if (backpressure) {
- bpTasks.add(entry.getKey());
- } else {
- nonBpTasks.add(entry.getKey());
+ //System bolt is not a part of backpressure.
+ if (entry.getKey() >= 0) {
+ boolean backpressure = entry.getValue().backpressure.get();
+ if (backpressure) {
+ bpTasks.add(entry.getKey());
+ } else {
+ nonBpTasks.add(entry.getKey());
+ }
}
}
return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
http://git-wip-us.apache.org/repos/asf/storm/blob/63017568/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index 4c38344..a903264 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -47,12 +47,20 @@ public class StormClientHandler extends ChannelInboundHandlerAdapter {
BackPressureStatus status = (BackPressureStatus) message;
if (status.bpTasks != null) {
for (Integer bpTask : status.bpTasks) {
- remoteBpStatus[bpTask].set(true);
+ try {
+ remoteBpStatus[bpTask].set(true);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ LOG.error("BP index out of bounds {}", e);
+ }
}
}
if (status.nonBpTasks != null) {
for (Integer bpTask : status.nonBpTasks) {
- remoteBpStatus[bpTask].set(false);
+ try {
+ remoteBpStatus[bpTask].set(false);
+ } catch (ArrayIndexOutOfBoundsException e) {
+ LOG.error("BP index out of bounds {}", e);
+ }
}
}
LOG.debug("Received BackPressure status update : {}", status);