You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2018/07/10 20:28:26 UTC

[1/3] storm git commit: STORM-3141: Fix NPE in WorkerState.transferLocalBatch, and refactor BackpressureTracker to get rid of placeholder JCQueue

Repository: storm
Updated Branches:
  refs/heads/master 2d7c7d316 -> 07c795af5


STORM-3141: Fix NPE in WorkerState.transferLocalBatch, and refactor BackpressureTracker to get rid of placeholder JCQueue


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/16132068
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/16132068
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/16132068

Branch: refs/heads/master
Commit: 1613206802ffa00f1b6b9ab56a741e86aeff6a5e
Parents: 26d2f95
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Mon Jul 2 21:42:54 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Mon Jul 2 21:56:27 2018 +0200

----------------------------------------------------------------------
 .../daemon/worker/BackPressureTracker.java      | 63 ++++++++++++--------
 .../apache/storm/daemon/worker/WorkerState.java | 31 ++++------
 .../java/org/apache/storm/MessagingTest.java    | 34 +++++++++++
 3 files changed, 84 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index bc396aa..8d96447 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -22,39 +22,35 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.Constants;
 import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.utils.JCQueue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.stream.Collectors;
+import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
 
 /***
- *   Tracks the BackPressure status using a Map<TaskId, JCQueue>.
- *   Special value NONE, is used to indicate that the task is not under BackPressure
- *   ConcurrentHashMap does not allow storing null values, so we use the special value NONE instead.
+ *   Tracks the BackPressure status.
  */
 public class BackPressureTracker {
     static final Logger LOG = LoggerFactory.getLogger(BackPressureTracker.class);
-    private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null,
-                                                    "none", Constants.SYSTEM_COMPONENT_ID, -1, 0) {
-    };
-    private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>(); // updates are more frequent than iteration
+    private final Map<Integer, BackpressureState> tasks;
     private final String workerId;
 
-    public BackPressureTracker(String workerId, List<Integer> allLocalTasks) {
+    public BackPressureTracker(String workerId, Map<Integer, JCQueue> localTasksToQueues) {
         this.workerId = workerId;
-        for (Integer taskId : allLocalTasks) {
-            if (taskId != SYSTEM_TASK_ID) {
-                tasks.put(taskId, NONE);  // all tasks are considered to be not under BP initially
-            }
-        }
+        this.tasks = localTasksToQueues.entrySet().stream()
+            .collect(Collectors.toMap(
+                entry -> entry.getKey(),
+                entry -> new BackpressureState(entry.getValue())));
     }
 
     private void recordNoBackPressure(Integer taskId) {
-        tasks.put(taskId, NONE);
+        tasks.get(taskId).backpressure.set(false);
     }
 
     /***
@@ -62,16 +58,17 @@ public class BackPressureTracker {
      * This is called by transferLocalBatch() on NettyWorker thread
      * @return true if an update was recorded, false if taskId is already under BP
      */
-    public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
-        return tasks.put(taskId, recvQ) == NONE;
+    public boolean recordBackPressure(Integer taskId) {
+        return tasks.get(taskId).backpressure.getAndSet(true);
     }
 
     // returns true if there was a change in the BP situation
     public boolean refreshBpTaskList() {
         boolean changed = false;
         LOG.debug("Running Back Pressure status change check");
-        for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
-            if (entry.getValue() != NONE && entry.getValue().isEmptyOverflow()) {
+        for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
+            BackpressureState state = entry.getValue();
+            if (state.backpressure.get() && state.queue.isEmptyOverflow()) {
                 recordNoBackPressure(entry.getKey());
                 changed = true;
             }
@@ -83,9 +80,9 @@ public class BackPressureTracker {
         ArrayList<Integer> bpTasks = new ArrayList<>(tasks.size());
         ArrayList<Integer> nonBpTasks = new ArrayList<>(tasks.size());
 
-        for (Entry<Integer, JCQueue> entry : tasks.entrySet()) {
-            JCQueue q = entry.getValue();
-            if (q != NONE) {
+        for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
+            boolean backpressure = entry.getValue().backpressure.get();
+            if (backpressure) {
                 bpTasks.add(entry.getKey());
             } else {
                 nonBpTasks.add(entry.getKey());
@@ -93,4 +90,22 @@ public class BackPressureTracker {
         }
         return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
     }
+    
+    private static class BackpressureState {
+        private final JCQueue queue;
+        //No task is under backpressure initially
+        private final AtomicBoolean backpressure = new AtomicBoolean(false);
+
+        public BackpressureState(JCQueue queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public String toString() {
+            return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+                .append(queue)
+                .append(backpressure)
+                .toString();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index ab814da..9510bc0 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -114,11 +114,9 @@ public class WorkerState {
     final ReentrantReadWriteLock endpointSocketLock;
     final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
     final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
-    final Map<List<Long>, JCQueue> executorReceiveQueueMap;
     // executor id is in form [start_task_id end_task_id]
-    // short executor id is start_task_id
-    final Map<Integer, JCQueue> shortExecutorReceiveQueueMap;
-    final Map<Integer, Integer> taskToShortExecutor;
+    final Map<List<Long>, JCQueue> executorReceiveQueueMap;
+    final Map<Integer, JCQueue> taskToExecutorQueue;
     final Runnable suicideCallback;
     final Utils.UptimeComputer uptime;
     final Map<String, Object> defaultSharedResources;
@@ -166,12 +164,15 @@ public class WorkerState {
         this.isTopologyActive = new AtomicBoolean(false);
         this.stormComponentToDebug = new AtomicReference<>();
         this.executorReceiveQueueMap = mkReceiveQueueMap(topologyConf, localExecutors);
-        this.shortExecutorReceiveQueueMap = new HashMap<>();
         this.localTaskIds = new ArrayList<>();
+        this.taskToExecutorQueue = new HashMap<>();
         this.blobToLastKnownVersion = new ConcurrentHashMap<>();
         for (Map.Entry<List<Long>, JCQueue> entry : executorReceiveQueueMap.entrySet()) {
-            this.shortExecutorReceiveQueueMap.put(entry.getKey().get(0).intValue(), entry.getValue());
-            this.localTaskIds.addAll(StormCommon.executorIdToTasks(entry.getKey()));
+            List<Integer> taskIds = StormCommon.executorIdToTasks(entry.getKey());
+            for (Integer taskId : taskIds) {
+                this.taskToExecutorQueue.put(taskId, entry.getValue());
+            }
+            this.localTaskIds.addAll(taskIds);
         }
         Collections.sort(localTaskIds);
         this.topologyConf = topologyConf;
@@ -192,12 +193,6 @@ public class WorkerState {
         this.endpointSocketLock = new ReentrantReadWriteLock();
         this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap<>());
         this.cachedTaskToNodePort = new AtomicReference<>(new HashMap<>());
-        this.taskToShortExecutor = new HashMap<>();
-        for (List<Long> executor : this.localExecutors) {
-            for (Integer task : StormCommon.executorIdToTasks(executor)) {
-                taskToShortExecutor.put(task, executor.get(0).intValue());
-            }
-        }
         this.suicideCallback = Utils.mkSuicideFn();
         this.uptime = Utils.makeUptimeComputer();
         this.defaultSharedResources = makeDefaultResources();
@@ -212,7 +207,7 @@ public class WorkerState {
         }
         int maxTaskId = getMaxTaskId(componentToSortedTasks);
         this.workerTransfer = new WorkerTransfer(this, topologyConf, maxTaskId);
-        this.bpTracker = new BackPressureTracker(workerId, localTaskIds);
+        this.bpTracker = new BackPressureTracker(workerId, taskToExecutorQueue);
         this.deserializedWorkerHooks = deserializeWorkerHooks();
     }
 
@@ -323,10 +318,6 @@ public class WorkerState {
         return executorReceiveQueueMap;
     }
 
-    public Map<Integer, JCQueue> getShortExecutorReceiveQueueMap() {
-        return shortExecutorReceiveQueueMap;
-    }
-
     public Runnable getSuicideCallback() {
         return suicideCallback;
     }
@@ -531,7 +522,7 @@ public class WorkerState {
 
         for (int i = 0; i < tupleBatch.size(); i++) {
             AddressedTuple tuple = tupleBatch.get(i);
-            JCQueue queue = shortExecutorReceiveQueueMap.get(tuple.dest);
+            JCQueue queue = taskToExecutorQueue.get(tuple.dest);
 
             // 1- try adding to main queue if its overflow is not empty
             if (queue.isEmptyOverflow()) {
@@ -542,7 +533,7 @@ public class WorkerState {
 
             // 2- BP detected (i.e MainQ is full). So try adding to overflow
             int currOverflowCount = queue.getOverflowCount();
-            if (bpTracker.recordBackPressure(tuple.dest, queue)) {
+            if (bpTracker.recordBackPressure(tuple.dest)) {
                 receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
                 lastOverflowCount = currOverflowCount;
             } else {

http://git-wip-us.apache.org/repos/asf/storm/blob/16132068/storm-server/src/test/java/org/apache/storm/MessagingTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/MessagingTest.java b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
index 18cfc37..74b7388 100644
--- a/storm-server/src/test/java/org/apache/storm/MessagingTest.java
+++ b/storm-server/src/test/java/org/apache/storm/MessagingTest.java
@@ -58,4 +58,38 @@ public class MessagingTest {
             Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size());
         }
     }
+    
+    @Test
+    public void testRemoteTransportWithManyTasksInReceivingExecutor() throws Exception {
+        //STORM-3141 regression test
+        //Verify that remote worker can handle many tasks in one executor
+        Config topoConf = new Config();
+        topoConf.put(Config.TOPOLOGY_WORKERS, 2);
+        topoConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
+
+        try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime()
+                                                               .withSupervisors(1).withPortsPerSupervisor(2)
+                                                               .withDaemonConf(topoConf).build()) {
+
+            TopologyBuilder builder = new TopologyBuilder();
+            builder.setSpout("1", new TestWordSpout(true), 1);
+            builder.setBolt("2", new TestGlobalCount(), 1)
+                .setNumTasks(10)
+                .shuffleGrouping("1");
+            StormTopology stormTopology = builder.createTopology();
+
+            List<FixedTuple> fixedTuples = new ArrayList<>();
+            for (int i = 0; i < 12; i++) {
+                fixedTuples.add(new FixedTuple(Collections.singletonList("a")));
+                fixedTuples.add(new FixedTuple(Collections.singletonList("b")));
+            }
+            Map<String, List<FixedTuple>> data = new HashMap<>();
+            data.put("1", fixedTuples);
+            MockedSources mockedSources = new MockedSources(data);
+            CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
+            completeTopologyParam.setMockedSources(mockedSources);
+            Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, stormTopology, completeTopologyParam);
+            Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size());
+        }
+    }
 }


[2/3] storm git commit: Fix bug in BackPressureTracker.recordBackPressure, add some tests

Posted by bo...@apache.org.
Fix bug in BackPressureTracker.recordBackPressure, add some tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f60e943a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f60e943a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f60e943a

Branch: refs/heads/master
Commit: f60e943a388f3e828d10237782c205781c3050bc
Parents: 1613206
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Thu Jul 5 20:20:15 2018 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Thu Jul 5 20:20:15 2018 +0200

----------------------------------------------------------------------
 .../daemon/worker/BackPressureTracker.java      |   2 +-
 .../daemon/worker/BackPressureTrackerTest.java  | 119 +++++++++++++++++++
 2 files changed, 120 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f60e943a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index 8d96447..a4e87ba 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -59,7 +59,7 @@ public class BackPressureTracker {
      * @return true if an update was recorded, false if taskId is already under BP
      */
     public boolean recordBackPressure(Integer taskId) {
-        return tasks.get(taskId).backpressure.getAndSet(true);
+        return tasks.get(taskId).backpressure.getAndSet(true) == false;
     }
 
     // returns true if there was a change in the BP situation

http://git-wip-us.apache.org/repos/asf/storm/blob/f60e943a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
new file mode 100644
index 0000000..7e891b5
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.worker;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import org.apache.storm.messaging.netty.BackPressureStatus;
+import org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.storm.utils.JCQueue;
+import org.junit.Test;
+
+public class BackPressureTrackerTest {
+
+    private static final String WORKER_ID = "worker";
+
+    @Test
+    public void testGetBackpressure() {
+        int taskIdNoBackPressure = 1;
+        JCQueue noBackPressureQueue = mock(JCQueue.class);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
+            Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
+
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
+        assertThat(status.bpTasks, is(empty()));
+    }
+
+    @Test
+    public void testSetBackpressure() {
+        int taskIdNoBackPressure = 1;
+        JCQueue noBackPressureQueue = mock(JCQueue.class);
+        int taskIdBackPressure = 2;
+        JCQueue backPressureQueue = mock(JCQueue.class);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskIdNoBackPressure, noBackPressureQueue,
+            taskIdBackPressure, backPressureQueue));
+
+        boolean backpressureChanged = tracker.recordBackPressure(taskIdBackPressure);
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(true));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.nonBpTasks, contains(taskIdNoBackPressure));
+        assertThat(status.bpTasks, contains(taskIdBackPressure));
+    }
+
+    @Test
+    public void testSetBackpressureWithExistingBackpressure() {
+        int taskId = 1;
+        JCQueue queue = mock(JCQueue.class);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskId, queue));
+        tracker.recordBackPressure(taskId);
+
+        boolean backpressureChanged = tracker.recordBackPressure(taskId);
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(false));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.bpTasks, contains(taskId));
+    }
+
+    @Test
+    public void testRefreshBackpressureWithEmptyOverflow() {
+        int taskId = 1;
+        JCQueue queue = mock(JCQueue.class);
+        when(queue.isEmptyOverflow()).thenReturn(true);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskId, queue));
+        tracker.recordBackPressure(taskId);
+
+        boolean backpressureChanged = tracker.refreshBpTaskList();
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(true));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.nonBpTasks, contains(taskId));
+    }
+
+    @Test
+    public void testRefreshBackPressureWithNonEmptyOverflow() {
+        int taskId = 1;
+        JCQueue queue = mock(JCQueue.class);
+        when(queue.isEmptyOverflow()).thenReturn(false);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskId, queue));
+        tracker.recordBackPressure(taskId);
+
+        boolean backpressureChanged = tracker.refreshBpTaskList();
+        BackPressureStatus status = tracker.getCurrStatus();
+
+        assertThat(backpressureChanged, is(false));
+        assertThat(status.workerId, is(WORKER_ID));
+        assertThat(status.bpTasks, contains(taskId));
+    }
+
+}


[3/3] storm git commit: Merge branch 'STORM-3141' of https://github.com/srdo/storm into STORM-3141

Posted by bo...@apache.org.
Merge branch 'STORM-3141' of https://github.com/srdo/storm into STORM-3141

STORM-3141: Fix NPE in WorkerState.transferLocalBatch

This closes #2750


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07c795af
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07c795af
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07c795af

Branch: refs/heads/master
Commit: 07c795af5dd398832049262d25858ecc846f4ac5
Parents: 2d7c7d3 f60e943
Author: Robert Evans <ev...@yahoo-inc.com>
Authored: Tue Jul 10 14:03:36 2018 -0500
Committer: Robert Evans <ev...@yahoo-inc.com>
Committed: Tue Jul 10 14:03:36 2018 -0500

----------------------------------------------------------------------
 .../daemon/worker/BackPressureTracker.java      |  63 ++++++----
 .../apache/storm/daemon/worker/WorkerState.java |  31 ++---
 .../daemon/worker/BackPressureTrackerTest.java  | 119 +++++++++++++++++++
 .../java/org/apache/storm/MessagingTest.java    |  34 ++++++
 4 files changed, 203 insertions(+), 44 deletions(-)
----------------------------------------------------------------------