You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/10/01 14:38:41 UTC

[storm] branch master updated: STORM-3510: Track overflow count per taskId for resending backpressur… (#3131)

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new c78f1aa  STORM-3510: Track overflow count per taskId for resending backpressur… (#3131)
c78f1aa is described below

commit c78f1aaa8fcd17eb5c8f45f5dbb90e30f99e19bb
Author: cjljohnson <cj...@gmail.com>
AuthorDate: Tue Oct 1 15:38:28 2019 +0100

    STORM-3510: Track overflow count per taskId for resending backpressur… (#3131)
    
    * STORM-3510: Track overflow count per taskId for resending backpressure status
---
 .../storm/daemon/worker/BackPressureTracker.java   | 29 ++++++++++++++----
 .../apache/storm/daemon/worker/WorkerState.java    | 14 +++++----
 .../daemon/worker/BackPressureTrackerTest.java     | 35 ++++++++++++++++++----
 3 files changed, 60 insertions(+), 18 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index dae5cca..3c590e5 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -49,8 +49,12 @@ public class BackPressureTracker {
                 entry -> new BackpressureState(entry.getValue())));
     }
 
-    private void recordNoBackPressure(Integer taskId) {
-        tasks.get(taskId).backpressure.set(false);
+    public BackpressureState getBackpressureState(Integer taskId) {
+        return tasks.get(taskId);
+    }
+
+    private void recordNoBackPressure(BackpressureState state) {
+        state.backpressure.set(false);
     }
 
     /**
@@ -60,8 +64,8 @@ public class BackPressureTracker {
      *
      * @return true if an update was recorded, false if taskId is already under BP
      */
-    public boolean recordBackPressure(Integer taskId) {
-        return tasks.get(taskId).backpressure.getAndSet(true) == false;
+    public boolean recordBackPressure(BackpressureState state) {
+        return state.backpressure.getAndSet(true) == false;
     }
 
     // returns true if there was a change in the BP situation
@@ -71,7 +75,7 @@ public class BackPressureTracker {
         for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
             BackpressureState state = entry.getValue();
             if (state.backpressure.get() && state.queue.isEmptyOverflow()) {
-                recordNoBackPressure(entry.getKey());
+                recordNoBackPressure(state);
                 changed = true;
             }
         }
@@ -95,11 +99,24 @@ public class BackPressureTracker {
         }
         return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
     }
+
+    public int getLastOverflowCount(BackpressureState state) {
+        return state.lastOverflowCount;
+    }
+
+    public void setLastOverflowCount(BackpressureState state, int value) {
+        state.lastOverflowCount = value;
+    }
+
+
     
-    private static class BackpressureState {
+    public static class BackpressureState {
         private final JCQueue queue;
         //No task is under backpressure initially
         private final AtomicBoolean backpressure = new AtomicBoolean(false);
+        //The overflow count last time BP status was sent
+        private int lastOverflowCount = 0;
+
 
         BackpressureState(JCQueue queue) {
             this.queue = queue;
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f380769..eaab4e9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -42,6 +42,7 @@ import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.cluster.VersionedData;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState;
 import org.apache.storm.executor.IRunningExecutor;
 import org.apache.storm.generated.Assignment;
 import org.apache.storm.generated.DebugOptions;
@@ -88,6 +89,7 @@ public class WorkerState {
 
     private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class);
     private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
+    private static final int RESEND_BACKPRESSURE_SIZE = 10000;
     private static long dropCount = 0;
     final Map<String, Object> conf;
     final IContext mqContext;
@@ -533,8 +535,6 @@ public class WorkerState {
     // Receives msgs from remote workers and feeds them to local executors. If any receiving local executor is under Back Pressure,
     // informs other workers about back pressure situation. Runs in the NettyWorker thread.
     private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) {
-        int lastOverflowCount = 0; // overflowQ size at the time the last BPStatus was sent
-
         for (int i = 0; i < tupleBatch.size(); i++) {
             AddressedTuple tuple = tupleBatch.get(i);
             JCQueue queue = taskToExecutorQueue.get(tuple.dest);
@@ -548,16 +548,18 @@ public class WorkerState {
 
             // 2- BP detected (i.e MainQ is full). So try adding to overflow
             int currOverflowCount = queue.getOverflowCount();
-            if (bpTracker.recordBackPressure(tuple.dest)) {
+            // get BP state object so only have to lookup once
+            BackpressureState bpState = bpTracker.getBackpressureState(tuple.dest);
+            if (bpTracker.recordBackPressure(bpState)) {
                 receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
-                lastOverflowCount = currOverflowCount;
+                bpTracker.setLastOverflowCount(bpState, currOverflowCount);
             } else {
 
-                if (currOverflowCount - lastOverflowCount > 10000) {
+                if (currOverflowCount - bpTracker.getLastOverflowCount(bpState) > RESEND_BACKPRESSURE_SIZE) {
                     // resend BP status, in case prev notification was missed or reordered
                     BackPressureStatus bpStatus = bpTracker.getCurrStatus();
                     receiver.sendBackPressureStatus(bpStatus);
-                    lastOverflowCount = currOverflowCount;
+                    bpTracker.setLastOverflowCount(bpState, currOverflowCount);
                     LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
                 }
             }
diff --git a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
index 7e891b5..f642c54 100644
--- a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
+++ b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
+
+import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState;
 import org.apache.storm.messaging.netty.BackPressureStatus;
 import org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
 import org.apache.storm.utils.JCQueue;
@@ -38,7 +40,7 @@ public class BackPressureTrackerTest {
         int taskIdNoBackPressure = 1;
         JCQueue noBackPressureQueue = mock(JCQueue.class);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
-            Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
+                Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
 
         BackPressureStatus status = tracker.getCurrStatus();
 
@@ -57,7 +59,8 @@ public class BackPressureTrackerTest {
             taskIdNoBackPressure, noBackPressureQueue,
             taskIdBackPressure, backPressureQueue));
 
-        boolean backpressureChanged = tracker.recordBackPressure(taskIdBackPressure);
+        BackpressureState state = tracker.getBackpressureState(taskIdBackPressure);
+        boolean backpressureChanged = tracker.recordBackPressure(state);
         BackPressureStatus status = tracker.getCurrStatus();
 
         assertThat(backpressureChanged, is(true));
@@ -72,9 +75,10 @@ public class BackPressureTrackerTest {
         JCQueue queue = mock(JCQueue.class);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
             taskId, queue));
-        tracker.recordBackPressure(taskId);
+        BackpressureState state = tracker.getBackpressureState(taskId);
+        tracker.recordBackPressure(state);
 
-        boolean backpressureChanged = tracker.recordBackPressure(taskId);
+        boolean backpressureChanged = tracker.recordBackPressure(state);
         BackPressureStatus status = tracker.getCurrStatus();
 
         assertThat(backpressureChanged, is(false));
@@ -89,7 +93,8 @@ public class BackPressureTrackerTest {
         when(queue.isEmptyOverflow()).thenReturn(true);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
             taskId, queue));
-        tracker.recordBackPressure(taskId);
+        BackpressureState state = tracker.getBackpressureState(taskId);
+        tracker.recordBackPressure(state);
 
         boolean backpressureChanged = tracker.refreshBpTaskList();
         BackPressureStatus status = tracker.getCurrStatus();
@@ -106,7 +111,8 @@ public class BackPressureTrackerTest {
         when(queue.isEmptyOverflow()).thenReturn(false);
         BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
             taskId, queue));
-        tracker.recordBackPressure(taskId);
+        BackpressureState state = tracker.getBackpressureState(taskId);
+        tracker.recordBackPressure(state);
 
         boolean backpressureChanged = tracker.refreshBpTaskList();
         BackPressureStatus status = tracker.getCurrStatus();
@@ -116,4 +122,21 @@ public class BackPressureTrackerTest {
         assertThat(status.bpTasks, contains(taskId));
     }
 
+    @Test
+    public void testSetLastOverflowCount() {
+        int taskId = 1;
+        int overflow = 5;
+        JCQueue queue = mock(JCQueue.class);
+        BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+            taskId, queue));
+        BackpressureState state = tracker.getBackpressureState(taskId);
+        tracker.recordBackPressure(state);
+        tracker.setLastOverflowCount(state, overflow);
+
+        BackpressureState retrievedState = tracker.getBackpressureState(taskId);
+        int lastOverflowCount = tracker.getLastOverflowCount(retrievedState);
+
+        assertThat(lastOverflowCount, is(overflow));
+    }
+
 }