You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/10/01 14:38:41 UTC
[storm] branch master updated: STORM-3510: Track overflow count per taskId for resending backpressur… (#3131)
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new c78f1aa STORM-3510: Track overflow count per taskId for resending backpressur… (#3131)
c78f1aa is described below
commit c78f1aaa8fcd17eb5c8f45f5dbb90e30f99e19bb
Author: cjljohnson <cj...@gmail.com>
AuthorDate: Tue Oct 1 15:38:28 2019 +0100
STORM-3510: Track overflow count per taskId for resending backpressur… (#3131)
* STORM-3510: Track overflow count per taskId for resending backpressure status
---
.../storm/daemon/worker/BackPressureTracker.java | 29 ++++++++++++++----
.../apache/storm/daemon/worker/WorkerState.java | 14 +++++----
.../daemon/worker/BackPressureTrackerTest.java | 35 ++++++++++++++++++----
3 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
index dae5cca..3c590e5 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java
@@ -49,8 +49,12 @@ public class BackPressureTracker {
entry -> new BackpressureState(entry.getValue())));
}
- private void recordNoBackPressure(Integer taskId) {
- tasks.get(taskId).backpressure.set(false);
+ public BackpressureState getBackpressureState(Integer taskId) {
+ return tasks.get(taskId);
+ }
+
+ private void recordNoBackPressure(BackpressureState state) {
+ state.backpressure.set(false);
}
/**
@@ -60,8 +64,8 @@ public class BackPressureTracker {
*
* @return true if an update was recorded, false if taskId is already under BP
*/
- public boolean recordBackPressure(Integer taskId) {
- return tasks.get(taskId).backpressure.getAndSet(true) == false;
+ public boolean recordBackPressure(BackpressureState state) {
+ return state.backpressure.getAndSet(true) == false;
}
// returns true if there was a change in the BP situation
@@ -71,7 +75,7 @@ public class BackPressureTracker {
for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) {
BackpressureState state = entry.getValue();
if (state.backpressure.get() && state.queue.isEmptyOverflow()) {
- recordNoBackPressure(entry.getKey());
+ recordNoBackPressure(state);
changed = true;
}
}
@@ -95,11 +99,24 @@ public class BackPressureTracker {
}
return new BackPressureStatus(workerId, bpTasks, nonBpTasks);
}
+
+ public int getLastOverflowCount(BackpressureState state) {
+ return state.lastOverflowCount;
+ }
+
+ public void setLastOverflowCount(BackpressureState state, int value) {
+ state.lastOverflowCount = value;
+ }
+
+
- private static class BackpressureState {
+ public static class BackpressureState {
private final JCQueue queue;
//No task is under backpressure initially
private final AtomicBoolean backpressure = new AtomicBoolean(false);
+ //The overflow count last time BP status was sent
+ private int lastOverflowCount = 0;
+
BackpressureState(JCQueue queue) {
this.queue = queue;
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
index f380769..eaab4e9 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
@@ -42,6 +42,7 @@ import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState;
import org.apache.storm.executor.IRunningExecutor;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.DebugOptions;
@@ -88,6 +89,7 @@ public class WorkerState {
private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class);
private static final long LOAD_REFRESH_INTERVAL_MS = 5000L;
+ private static final int RESEND_BACKPRESSURE_SIZE = 10000;
private static long dropCount = 0;
final Map<String, Object> conf;
final IContext mqContext;
@@ -533,8 +535,6 @@ public class WorkerState {
// Receives msgs from remote workers and feeds them to local executors. If any receiving local executor is under Back Pressure,
// informs other workers about back pressure situation. Runs in the NettyWorker thread.
private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) {
- int lastOverflowCount = 0; // overflowQ size at the time the last BPStatus was sent
-
for (int i = 0; i < tupleBatch.size(); i++) {
AddressedTuple tuple = tupleBatch.get(i);
JCQueue queue = taskToExecutorQueue.get(tuple.dest);
@@ -548,16 +548,18 @@ public class WorkerState {
// 2- BP detected (i.e MainQ is full). So try adding to overflow
int currOverflowCount = queue.getOverflowCount();
- if (bpTracker.recordBackPressure(tuple.dest)) {
+ // get BP state object so only have to lookup once
+ BackpressureState bpState = bpTracker.getBackpressureState(tuple.dest);
+ if (bpTracker.recordBackPressure(bpState)) {
receiver.sendBackPressureStatus(bpTracker.getCurrStatus());
- lastOverflowCount = currOverflowCount;
+ bpTracker.setLastOverflowCount(bpState, currOverflowCount);
} else {
- if (currOverflowCount - lastOverflowCount > 10000) {
+ if (currOverflowCount - bpTracker.getLastOverflowCount(bpState) > RESEND_BACKPRESSURE_SIZE) {
// resend BP status, in case prev notification was missed or reordered
BackPressureStatus bpStatus = bpTracker.getCurrStatus();
receiver.sendBackPressureStatus(bpStatus);
- lastOverflowCount = currOverflowCount;
+ bpTracker.setLastOverflowCount(bpState, currOverflowCount);
LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id);
}
}
diff --git a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
index 7e891b5..f642c54 100644
--- a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
+++ b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.Collections;
+
+import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap;
import org.apache.storm.utils.JCQueue;
@@ -38,7 +40,7 @@ public class BackPressureTrackerTest {
int taskIdNoBackPressure = 1;
JCQueue noBackPressureQueue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID,
- Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
+ Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue));
BackPressureStatus status = tracker.getCurrStatus();
@@ -57,7 +59,8 @@ public class BackPressureTrackerTest {
taskIdNoBackPressure, noBackPressureQueue,
taskIdBackPressure, backPressureQueue));
- boolean backpressureChanged = tracker.recordBackPressure(taskIdBackPressure);
+ BackpressureState state = tracker.getBackpressureState(taskIdBackPressure);
+ boolean backpressureChanged = tracker.recordBackPressure(state);
BackPressureStatus status = tracker.getCurrStatus();
assertThat(backpressureChanged, is(true));
@@ -72,9 +75,10 @@ public class BackPressureTrackerTest {
JCQueue queue = mock(JCQueue.class);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskId, queue));
- tracker.recordBackPressure(taskId);
+ BackpressureState state = tracker.getBackpressureState(taskId);
+ tracker.recordBackPressure(state);
- boolean backpressureChanged = tracker.recordBackPressure(taskId);
+ boolean backpressureChanged = tracker.recordBackPressure(state);
BackPressureStatus status = tracker.getCurrStatus();
assertThat(backpressureChanged, is(false));
@@ -89,7 +93,8 @@ public class BackPressureTrackerTest {
when(queue.isEmptyOverflow()).thenReturn(true);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskId, queue));
- tracker.recordBackPressure(taskId);
+ BackpressureState state = tracker.getBackpressureState(taskId);
+ tracker.recordBackPressure(state);
boolean backpressureChanged = tracker.refreshBpTaskList();
BackPressureStatus status = tracker.getCurrStatus();
@@ -106,7 +111,8 @@ public class BackPressureTrackerTest {
when(queue.isEmptyOverflow()).thenReturn(false);
BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
taskId, queue));
- tracker.recordBackPressure(taskId);
+ BackpressureState state = tracker.getBackpressureState(taskId);
+ tracker.recordBackPressure(state);
boolean backpressureChanged = tracker.refreshBpTaskList();
BackPressureStatus status = tracker.getCurrStatus();
@@ -116,4 +122,21 @@ public class BackPressureTrackerTest {
assertThat(status.bpTasks, contains(taskId));
}
+ @Test
+ public void testSetLastOverflowCount() {
+ int taskId = 1;
+ int overflow = 5;
+ JCQueue queue = mock(JCQueue.class);
+ BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of(
+ taskId, queue));
+ BackpressureState state = tracker.getBackpressureState(taskId);
+ tracker.recordBackPressure(state);
+ tracker.setLastOverflowCount(state, overflow);
+
+ BackpressureState retrievedState = tracker.getBackpressureState(taskId);
+ int lastOverflowCount = tracker.getLastOverflowCount(retrievedState);
+
+ assertThat(lastOverflowCount, is(overflow));
+ }
+
}