You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/07/10 20:28:26 UTC
[1/3] storm git commit: STORM-3141: Fix NPE in
WorkerState.transferLocalBatch,
and refactor BackpressureTracker to get rid of placeholder JCQueue
Repository: storm
Updated Branches:
refs/heads/master 2d7c7d316 -> 07c795af5
STORM-3141: Fix NPE in WorkerState.transferLocalBatch, and refactor BackpressureTracker to get rid of placeholder JCQueue
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/16132068
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/16132068
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/16132068
Branch: refs/heads/master
Commit: 1613206802ffa00f1b6b9ab56a741e86aeff6a5e
Parents: 26d2f95
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Jul 2 21:42:54 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 2 21:56:27 2018 +0200
----------------------------------------------------------------------
.../daemon/worker/BackPressureTracker.java | 63 ++++++++++++--------
.../apache/storm/daemon/worker/WorkerState.java | 31 ++++------
.../java/org/apache/storm/MessagingTest.java | 34 +++++++++++
3 files changed, 84 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index bc396aa..8d96447 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -22,39 +22,35 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.Constants;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.apache.storm.utils.JCQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.stream.Collectors;
+import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
/***
- * Tracks the BackPressure status using a Map<TaskId, JCQueue>.
- * Special value NONE, is used to indicate that the task is not under BackPressure
- * ConcurrentHashMap does not allow storing null values, so we use the special value NONE instead.
+ * Tracks the BackPressure status.
*/
public class BackPressureTracker {
static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
- private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null,
- "none", Constants.SYSTEM_COMPONENT_ID, -1, 0) {
- };
- private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
+ private final Map<Integer, BackpressureState> tasks;
private final String workerId;
- public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
+ public BackPressureTracker(String workerId, Map<Integer, JCQueue> localTasksToQueues) {
this.workerId = workerId;
- for (Integer taskId : allLocalTasks) {
- if (taskId != SYSTEM_TASK_ID) {
- tasks.put(taskId, NONE); // all tasks are considered to be not under BP initially
- }
- }
+ this.tasks = localTasksToQueues.entrySet().stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey(),
+ entry -> new BackpressureState(entry.getValue())));
}
private void recordNoBackPressure(Integer taskId) {
- tasks.put(taskId, NONE);
+ tasks.get(taskId).backpressure.set(false);
}
/***
@@ -62,16 +58,17 @@ public class BackPressureTracker {
* This is called by transferLocalBatch() on NettyWorker thread
* @return true if an update was recorded, false if taskId is already under BP
*/
- public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
- return tasks.put(taskId, recvQ) == NONE;
+ public boolean recordBackPressure(Integer taskId) {
+ return tasks.get(taskId).backpressure.getAndSet(true);
}
// returns true if there was a change in the BP situation
public boolean refreshBpTaskList() {
boolean changed = false;
LOG.debug("Running Back Pressure status change check");
- for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
- if (entry.getValue() != NONE && entry.getValue().isEmptyOverflow()) {
+ for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
+ BackpressureState state = entry.getValue();
+ if (state.backpressure.get() && state.queue.isEmptyOverflow()) {
recordNoBackPressure(entry.getKey());
changed = true;
}
@@ -83,9 +80,9 @@ public class BackPressureTracker {
ArrayList<Integer> bpTasks = new ArrayList<>(tasks.size());
ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
- for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
- JCQueue q = entry.getValue();
- if (q != NONE) {
+ for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
+ boolean backpressure = entry.getValue().backpressure.get();
+ if (backpressure) {
bpTasks.add(entry.getKey());
} else {
nonBpTasks.add(entry.getKey());
@@ -93,4 +90,22 @@ public class BackPressureTracker {
}
return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
}
+
+ private static class BackpressureState {
+ private final JCQueue queue;
+ //No task is under backpressure initially
+ private final AtomicBoolean backpressure = new AtomicBoolean(false);
+
+ public BackpressureState(JCQueue queue) {
+ this.queue = queue;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+ .append(queue)
+ .append(backpressure)
+ .toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index ab814da..9510bc0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -114,11 +114,9 @@ public class WorkerState {
final ReentrantReadWriteLock endpointSocketLock;
final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
- final Map<List<Long>, JCQueue> executorReceiveQueueMap;
// executor id is in form [start_task_id end_task_id]
- // short executor id is start_task_id
- final Map<Integer, JCQueue> shortExecutorReceiveQueueMap;
- final Map<Integer, Integer> taskToShortExecutor;
+ final Map<List<Long>, JCQueue> executorReceiveQueueMap;
+ final Map<Integer, JCQueue> taskToExecutorQueue;
final Runnable suicideCallback;
final Utils.UptimeComputer uptime;
final Map<String, Object> defaultSharedResources;
@@ -166,12 +164,15 @@ public class WorkerState {
this.isTopologyActive = new AtomicBoolean(false);
this.stormComponentToDebug = new AtomicReference<>();
this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors);
- this.shortExecutorReceiveQueueMap = new HashMap<>();
this.localTaskIds = new ArrayList<>();
+ this.taskToExecutorQueue = new HashMap<>();
this.blobToLastKnownVersion = new ConcurrentHashMap<>();
for (Map.Entry<List<Long>, JCQueue> entry : executorReceiveQueueMap.entrySet()) {
- this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
- this.localTaskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
+ List<Integer> taskIds = StormCommon.executorIdToTasks(entry.getKey());
+ for (Integer taskId : taskIds) {
+ this.taskToExecutorQueue.put(taskId, entry.getValue());
+ }
+ this.localTaskIds.addAll(taskIds);
}
Collections.sort(localTaskIds);
this.topologyConf = topologyConf;
@@ -192,12 +193,6 @@ public class WorkerState {
this.endpointSocketLock = new ReentrantReadWriteLock();
this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
- this.taskToShortExecutor = new HashMap<>();
- for (List<Long> executor : this.localExecutors) {
- for (Integer task : StormCommon.executorIdToTasks(executor)) {
- taskToShortExecutor.put(task, executor.get(0).intValue());
- }
- }
this.suicideCallback = Utils.mkSuicideFn();
this.uptime = Utils.makeUptimeComputer();
this.defaultSharedResources = makeDefaultResources();
@@ -212,7 +207,7 @@ public class WorkerState {
}
int maxTaskId = getMaxTaskId(componentToSortedTasks);
this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);
- this.bpTracker = new BackPressureTracker(workerId, localTaskIds);
+ this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue);
this.deserializedWorkerHooks = deserializeWorkerHooks();
}
@@ -323,10 +318,6 @@ public class WorkerState {
return executorReceiveQueueMap;
}
- public Map<Integer, JCQueue> getShortExecutorReceiveQueueMap() {
- return shortExecutorReceiveQueueMap;
- }
-
public Runnable getSuicideCallback() {
return suicideCallback;
}
@@ -531,7 +522,7 @@ public class WorkerState {
for (int i = 0; i < tupleBatch.size(); i++) {
AddressedTuple tuple = tupleBatch.get(i);
- JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
+ JCQueue queue = taskToExecutorQueue.get(tuple.dest);
// 1- try adding to main queue if its overflow is not empty
if (queue.isEmptyOverflow()) {
@@ -542,7 +533,7 @@ public class WorkerState {
// 2- BP detected (i.e MainQ is full). So try adding to overflow
int currOverflowCount = queue.getOverflowCount();
- if (bpTracker.recordBackPressure(tuple.dest, queue)) {
+ if (bpTracker.recordBackPressure(tuple.dest)) {
receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
lastOverflowCount = currOverflowCount;
} else {
http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-server/src/test/java/org/apache/storm/MessagingTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/MessagingTest.java b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
index 18cfc37..74b7388 100644
--- a/storm-server/src/test/java/org/apache/storm/MessagingTest.java
+++ b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
@@ -58,4 +58,38 @@ public class MessagingTest {
Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size());
}
}
+
+ @Test
+ public void testRemoteTransportWithManyTasksInReceivingExecutor() throws Exception {
+ //STORM-3141 regression test
+ //Verify that remote worker can handle many tasks in one executor
+ Config topoConf = new Config();
+ topoConf.put(Config.TOPOLOGY_WORKERS, 2);
+ topoConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
+
+ try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()
+ .withSupervisors(1).withPortsPerSupervisor(2)
+ .withDaemonConf(topoConf).build()) {
+
+ TopologyBuilder builder = new TopologyBuilder();
+ builder.setSpout("1", new TestWordSpout(true), 1);
+ builder.setBolt("2", new TestGlobalCount(), 1)
+ .setNumTasks(10)
+ .shuffleGrouping("1");
+ StormTopology stormTopology = builder.createTopology();
+
+ List<FixedTuple> fixedTuples = new ArrayList<>();
+ for (int i = 0; i < 12; i++) {
+ fixedTuples.add(new FixedTuple(Collections.singletonList("a")));
+ fixedTuples.add(new FixedTuple(Collections.singletonList("b")));
+ }
+ Map<String, List<FixedTuple>> data = new HashMap<>();
+ data.put("1", fixedTuples);
+ MockedSources mockedSources = new MockedSources(data);
+ CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
+ completeTopologyParam.setMockedSources(mockedSources);
+ Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
+ Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size());
+ }
+ }
}
[2/3] storm git commit: Fix bug in
BackPressureTracker.recordBackPressure, add some tests
Posted by bo...@apache.org.
Fix bug in BackPressureTracker.recordBackPressure, add some tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f60e943a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f60e943a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f60e943a
Branch: refs/heads/master
Commit: f60e943a388f3e828d10237782c205781c3050bc
Parents: 1613206
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Thu Jul 5 20:20:15 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Jul 5 20:20:15 2018 +0200
----------------------------------------------------------------------
.../daemon/worker/BackPressureTracker.java | 2 +-
.../daemon/worker/BackPressureTrackerTest.java | 119 +++++++++++++++++++
2 files changed, 120 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f60e943a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index 8d96447..a4e87ba 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -59,7 +59,7 @@ public class BackPressureTracker {
* @return true if an update was recorded, false if taskId is already under BP
*/
public boolean recordBackPressure(Integer taskId) {
- return tasks.get(taskId).backpressure.getAndSet(true);
+ return tasks.get(taskId).backpressure.getAndSet(true) == false;
}
// returns true if there was a change in the BP situation
http://git-wip-us.apache.org/repos/asf/storm/blob/f60e943a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
new file mode 100644
index 0000000..7e891b5
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.storm.utils.JCQueue;
+import org.junit.Test;
+
+public class BackPressureTrackerTest {
+
+ private static final String WORKER_ID = "worker";
+
+ @Test
+ public void testGetBackpressure() {
+ int taskIdNoBackPressure = 1;
+ JCQueue noBackPressureQueue = mock(JCQueue.class);
+ BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
+ Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
+
+ BackPressureStatus status = tracker.getCurrStatus();
+
+ assertThat(status.workerId, is(WORKER_ID));
+ assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
+ assertThat(status.bpTasks, is(empty()));
+ }
+
+ @Test
+ public void testSetBackpressure() {
+ int taskIdNoBackPressure = 1;
+ JCQueue noBackPressureQueue = mock(JCQueue.class);
+ int taskIdBackPressure = 2;
+ JCQueue backPressureQueue = mock(JCQueue.class);
+ BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+ taskIdNoBackPressure, noBackPressureQueue,
+ taskIdBackPressure, backPressureQueue));
+
+ boolean backpressureChanged = tracker.recordBackPressure(taskIdBackPressure);
+ BackPressureStatus status = tracker.getCurrStatus();
+
+ assertThat(backpressureChanged, is(true));
+ assertThat(status.workerId, is(WORKER_ID));
+ assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
+ assertThat(status.bpTasks, contains(taskIdBackPressure));
+ }
+
+ @Test
+ public void testSetBackpressureWithExistingBackpressure() {
+ int taskId = 1;
+ JCQueue queue = mock(JCQueue.class);
+ BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+ taskId, queue));
+ tracker.recordBackPressure(taskId);
+
+ boolean backpressureChanged = tracker.recordBackPressure(taskId);
+ BackPressureStatus status = tracker.getCurrStatus();
+
+ assertThat(backpressureChanged, is(false));
+ assertThat(status.workerId, is(WORKER_ID));
+ assertThat(status.bpTasks, contains(taskId));
+ }
+
+ @Test
+ public void testRefreshBackpressureWithEmptyOverflow() {
+ int taskId = 1;
+ JCQueue queue = mock(JCQueue.class);
+ when(queue.isEmptyOverflow()).thenReturn(true);
+ BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+ taskId, queue));
+ tracker.recordBackPressure(taskId);
+
+ boolean backpressureChanged = tracker.refreshBpTaskList();
+ BackPressureStatus status = tracker.getCurrStatus();
+
+ assertThat(backpressureChanged, is(true));
+ assertThat(status.workerId, is(WORKER_ID));
+ assertThat(status.nonBpTasks, contains(taskId));
+ }
+
+ @Test
+ public void testRefreshBackPressureWithNonEmptyOverflow() {
+ int taskId = 1;
+ JCQueue queue = mock(JCQueue.class);
+ when(queue.isEmptyOverflow()).thenReturn(false);
+ BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+ taskId, queue));
+ tracker.recordBackPressure(taskId);
+
+ boolean backpressureChanged = tracker.refreshBpTaskList();
+ BackPressureStatus status = tracker.getCurrStatus();
+
+ assertThat(backpressureChanged, is(false));
+ assertThat(status.workerId, is(WORKER_ID));
+ assertThat(status.bpTasks, contains(taskId));
+ }
+
+}
[3/3] storm git commit: Merge branch 'STORM-3141' of
https://github.com/srdo/storm into STORM-3141
Posted by bo...@apache.org.
Merge branch 'STORM-3141' of https://github.com/srdo/storm into STORM-3141
STORM-3141: Fix NPE in WorkerState.transferLocalBatch
This closes #2750
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07c795af
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07c795af
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07c795af
Branch: refs/heads/master
Commit: 07c795af5dd398832049262d25858ecc846f4ac5
Parents: 2d7c7d3 f60e943
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue Jul 10 14:03:36 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue Jul 10 14:03:36 2018 -0500
----------------------------------------------------------------------
.../daemon/worker/BackPressureTracker.java | 63 ++++++----
.../apache/storm/daemon/worker/WorkerState.java | 31 ++---
.../daemon/worker/BackPressureTrackerTest.java | 119 +++++++++++++++++++
.../java/org/apache/storm/MessagingTest.java | 34 ++++++
4 files changed, 203 insertions(+), 44 deletions(-)
----------------------------------------------------------------------