You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/14 22:24:09 UTC
[beam] branch master updated: Remove locks around ExecutionStateSampler (#22190)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 64bcc7df2b8 Remove locks around ExecutionStateSampler (#22190)
64bcc7df2b8 is described below
commit 64bcc7df2b86b9a531591a5f907e245f1af7998d
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Thu Jul 14 18:24:01 2022 -0400
Remove locks around ExecutionStateSampler (#22190)
* remove locks around ExecutionStateSampler
* make all benchmarks use 10 threads
* normalize benchmarks
* more state
---
.../core/metrics/ExecutionStateSampler.java | 17 ++-
.../core/metrics/ExecutionStateTracker.java | 18 +++-
.../dataflow/worker/DataflowExecutionContext.java | 4 +-
.../control/ExecutionStateSamplerBenchmark.java | 118 ++++++++++++++-------
.../ExecutionStateSamplerBenchmarkTest.java | 33 +++++-
5 files changed, 131 insertions(+), 59 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
index 2d39b377b57..6689d3ca31d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
@@ -20,15 +20,15 @@ package org.apache.beam.runners.core.metrics;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import java.io.Closeable;
-import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -37,10 +37,7 @@ import org.joda.time.DateTimeUtils.MillisProvider;
/** Monitors the execution of one or more execution threads. */
public class ExecutionStateSampler {
- // We use a synchronized data structure (as opposed to a concurrent one) since synchronization
- // is necessary to prevent races between tracker removal and the sampling thread iteration.
- @GuardedBy("this")
- private final HashSet<ExecutionStateTracker> activeTrackers = new HashSet<>();
+ private final Set<ExecutionStateTracker> activeTrackers = ConcurrentHashMap.newKeySet();
private static final MillisProvider SYSTEM_MILLIS_PROVIDER = System::currentTimeMillis;
@@ -150,15 +147,13 @@ public class ExecutionStateSampler {
}
/** Add the tracker to the sampling set. */
- synchronized void addTracker(ExecutionStateTracker tracker) {
+ void addTracker(ExecutionStateTracker tracker) {
this.activeTrackers.add(tracker);
}
/** Remove the tracker from the sampling set. */
void removeTracker(ExecutionStateTracker tracker) {
- synchronized (this) {
- activeTrackers.remove(tracker);
- }
+ activeTrackers.remove(tracker);
// Attribute any remaining time since the last sampling while removing the tracker.
//
@@ -174,7 +169,7 @@ public class ExecutionStateSampler {
/** Attributing sampling time to trackers. */
@VisibleForTesting
- public synchronized void doSampling(long millisSinceLastSample) {
+ public void doSampling(long millisSinceLastSample) {
for (ExecutionStateTracker tracker : activeTrackers) {
tracker.takeSample(millisSinceLastSample);
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index 969e9339bef..4575ddb6275 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -45,6 +46,8 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
new ConcurrentHashMap<>();
private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
+ private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> SAMPLING_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, "sampling");
public static final String START_STATE_NAME = "start";
public static final String PROCESS_STATE_NAME = "process";
@@ -117,6 +120,9 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
*/
private volatile @Nullable ExecutionState currentState;
+ @SuppressWarnings("UnusedVariable")
+ private volatile int sampling = 0;
+
/**
* The current number of times that this {@link ExecutionStateTracker} has transitioned state.
*
@@ -298,7 +304,17 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
return nextLullReportMs;
}
- protected void takeSample(long millisSinceLastSample) {
+ void takeSample(long millisSinceLastSample) {
+ if (SAMPLING_UPDATER.compareAndSet(this, 0, 1)) {
+ try {
+ takeSampleOnce(millisSinceLastSample);
+ } finally {
+ SAMPLING_UPDATER.set(this, 0);
+ }
+ }
+ }
+
+ protected void takeSampleOnce(long millisSinceLastSample) {
// These variables are read by Sampler thread, and written by Execution and Progress Reporting
// threads.
// Because there is no read/modify/write cycle in the Sampler thread, making them volatile
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 0d8c270d6a8..d02637c7efa 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -282,9 +282,9 @@ public abstract class DataflowExecutionContext<T extends DataflowStepContext> {
}
@Override
- protected void takeSample(long millisSinceLastSample) {
+ protected void takeSampleOnce(long millisSinceLastSample) {
elementExecutionTracker.takeSample(millisSinceLastSample);
- super.takeSample(millisSinceLastSample);
+ super.takeSampleOnce(millisSinceLastSample);
}
@Override
diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java
index 0a15302f009..b4de9b87dc0 100644
--- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java
+++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java
@@ -35,15 +35,16 @@ import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.infra.Blackhole;
/** Benchmarks for sampling execution state. */
public class ExecutionStateSamplerBenchmark {
private static final String PTRANSFORM = "benchmarkPTransform";
- @State(Scope.Benchmark)
- public static class RunnersCoreStateSampler {
- public final ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
- public final ExecutionStateTracker tracker = new ExecutionStateTracker(sampler);
+ @State(Scope.Thread)
+ public static class RunnersCoreStateTracker {
+ public ExecutionStateTracker tracker;
+
public final SimpleExecutionState state1 =
new SimpleExecutionState(
"process",
@@ -60,14 +61,13 @@ public class ExecutionStateSamplerBenchmark {
Urns.PROCESS_BUNDLE_MSECS,
new HashMap<>(Collections.singletonMap(Labels.PTRANSFORM, PTRANSFORM)));
- @Setup(Level.Trial)
- public void setup() {
- sampler.start();
+ @Setup
+ public void setup(RunnersCoreStateSampler sharedState) {
+ tracker = new ExecutionStateTracker(sharedState.sampler);
}
- @TearDown(Level.Trial)
+ @TearDown
public void tearDown() {
- sampler.stop();
// Print out the total millis so that JVM doesn't optimize code away.
System.out.println(
state1.getTotalMillis()
@@ -78,51 +78,82 @@ public class ExecutionStateSamplerBenchmark {
}
}
+ @State(Scope.Benchmark)
+ public static class RunnersCoreStateSampler {
+ public final ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
+
+ @Setup(Level.Trial)
+ public void setup() {
+ sampler.start();
+ }
+
+ @TearDown(Level.Trial)
+ public void tearDown() {
+ sampler.stop();
+ }
+ }
+
+ @State(Scope.Thread)
+ public static class HarnessStateTracker {
+ public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker tracker;
+
+ public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state1;
+ public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state2;
+ public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state3;
+
+ @Setup
+ public void setup(HarnessStateSampler sharedState) {
+ tracker = sharedState.sampler.create();
+ state1 = tracker.create("1", PTRANSFORM, PTRANSFORM + "Name", "1");
+ state2 = tracker.create("2", PTRANSFORM, PTRANSFORM + "Name", "2");
+ state3 = tracker.create("3", PTRANSFORM, PTRANSFORM + "Name", "3");
+ }
+
+ @TearDown
+ public void tearDown() {
+ Map<String, ByteString> monitoringData = new HashMap<>();
+ // Print out the total millis so that JVM doesn't optimize code away.
+ tracker.updateFinalMonitoringData(monitoringData);
+ System.out.println(monitoringData);
+ }
+ }
+
@State(Scope.Benchmark)
public static class HarnessStateSampler {
public final org.apache.beam.fn.harness.control.ExecutionStateSampler sampler =
new org.apache.beam.fn.harness.control.ExecutionStateSampler(
PipelineOptionsFactory.create(), System::currentTimeMillis);
- public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker
- tracker = sampler.create();
- public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state1 =
- tracker.create("1", PTRANSFORM, PTRANSFORM + "Name", "1");
- public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state2 =
- tracker.create("2", PTRANSFORM, PTRANSFORM + "Name", "2");
- public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state3 =
- tracker.create("3", PTRANSFORM, PTRANSFORM + "Name", "3");
@TearDown(Level.Trial)
public void tearDown() {
sampler.stop();
- Map<String, ByteString> monitoringData = new HashMap<>();
- tracker.updateFinalMonitoringData(monitoringData);
- // Print out the total millis so that JVM doesn't optimize code away.
- System.out.println(monitoringData);
}
}
@Benchmark
- @Threads(1)
- public void testTinyBundleRunnersCoreStateSampler(RunnersCoreStateSampler state)
- throws Exception {
- state.tracker.activate();
+ @Threads(512)
+ public void testTinyBundleRunnersCoreStateSampler(
+ RunnersCoreStateTracker trackerState, Blackhole bh) throws Exception {
+ ExecutionStateTracker tracker = trackerState.tracker;
+ Closeable c = tracker.activate();
for (int i = 0; i < 3; ) {
- Closeable close1 = state.tracker.enterState(state.state1);
- Closeable close2 = state.tracker.enterState(state.state2);
- Closeable close3 = state.tracker.enterState(state.state3);
+ Closeable close1 = tracker.enterState(trackerState.state1);
+ Closeable close2 = tracker.enterState(trackerState.state2);
+ Closeable close3 = tracker.enterState(trackerState.state3);
// trival code that is being sampled for this state
i += 1;
+ bh.consume(i);
close3.close();
close2.close();
close1.close();
}
- state.tracker.reset();
+ c.close();
}
@Benchmark
- @Threads(1)
- public void testTinyBundleHarnessStateSampler(HarnessStateSampler state) throws Exception {
+ @Threads(512)
+ public void testTinyBundleHarnessStateSampler(HarnessStateTracker state, Blackhole bh)
+ throws Exception {
state.tracker.start("processBundleId");
for (int i = 0; i < 3; ) {
state.state1.activate();
@@ -130,6 +161,7 @@ public class ExecutionStateSamplerBenchmark {
state.state3.activate();
// trival code that is being sampled for this state
i += 1;
+ bh.consume(i);
state.state3.deactivate();
state.state2.deactivate();
state.state1.deactivate();
@@ -138,26 +170,29 @@ public class ExecutionStateSamplerBenchmark {
}
@Benchmark
- @Threads(1)
- public void testLargeBundleRunnersCoreStateSampler(RunnersCoreStateSampler state)
- throws Exception {
- state.tracker.activate();
+ @Threads(16)
+ public void testLargeBundleRunnersCoreStateSampler(
+ RunnersCoreStateTracker trackerState, Blackhole bh) throws Exception {
+ ExecutionStateTracker tracker = trackerState.tracker;
+ Closeable c = tracker.activate();
for (int i = 0; i < 1000; ) {
- Closeable close1 = state.tracker.enterState(state.state1);
- Closeable close2 = state.tracker.enterState(state.state2);
- Closeable close3 = state.tracker.enterState(state.state3);
+ Closeable close1 = tracker.enterState(trackerState.state1);
+ Closeable close2 = tracker.enterState(trackerState.state2);
+ Closeable close3 = tracker.enterState(trackerState.state3);
// trival code that is being sampled for this state
i += 1;
+ bh.consume(i);
close3.close();
close2.close();
close1.close();
}
- state.tracker.reset();
+ c.close();
}
@Benchmark
- @Threads(1)
- public void testLargeBundleHarnessStateSampler(HarnessStateSampler state) throws Exception {
+ @Threads(16)
+ public void testLargeBundleHarnessStateSampler(HarnessStateTracker state, Blackhole bh)
+ throws Exception {
state.tracker.start("processBundleId");
for (int i = 0; i < 1000; ) {
state.state1.activate();
@@ -165,6 +200,7 @@ public class ExecutionStateSamplerBenchmark {
state.state3.activate();
// trival code that is being sampled for this state
i += 1;
+ bh.consume(i);
state.state3.deactivate();
state.state2.deactivate();
state.state1.deactivate();
diff --git a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java
index 3e735d9b59c..80564ac8922 100644
--- a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java
+++ b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java
@@ -18,41 +18,66 @@
package org.apache.beam.fn.harness.jmh.control;
import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.HarnessStateSampler;
+import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.HarnessStateTracker;
import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.RunnersCoreStateSampler;
+import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.RunnersCoreStateTracker;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.openjdk.jmh.infra.Blackhole;
/** Tests for {@link ExecutionStateSamplerBenchmark}. */
@RunWith(JUnit4.class)
public class ExecutionStateSamplerBenchmarkTest {
+ private Blackhole blackhole;
+
+ @Before
+ public void before() {
+ blackhole =
+ new Blackhole(
+ "Today's password is swordfish. I understand instantiating Blackholes directly is dangerous.");
+ }
+
@Test
public void testTinyBundleRunnersCoreStateSampler() throws Exception {
RunnersCoreStateSampler state = new RunnersCoreStateSampler();
+ RunnersCoreStateTracker threadState = new RunnersCoreStateTracker();
state.setup();
- new ExecutionStateSamplerBenchmark().testTinyBundleRunnersCoreStateSampler(state);
+ threadState.setup(state);
+ new ExecutionStateSamplerBenchmark()
+ .testTinyBundleRunnersCoreStateSampler(threadState, blackhole);
state.tearDown();
}
@Test
public void testLargeBundleRunnersCoreStateSampler() throws Exception {
RunnersCoreStateSampler state = new RunnersCoreStateSampler();
+ RunnersCoreStateTracker threadState = new RunnersCoreStateTracker();
state.setup();
- new ExecutionStateSamplerBenchmark().testLargeBundleRunnersCoreStateSampler(state);
+ threadState.setup(state);
+ new ExecutionStateSamplerBenchmark()
+ .testLargeBundleRunnersCoreStateSampler(threadState, blackhole);
state.tearDown();
}
@Test
public void testTinyBundleHarnessStateSampler() throws Exception {
HarnessStateSampler state = new HarnessStateSampler();
- new ExecutionStateSamplerBenchmark().testTinyBundleHarnessStateSampler(state);
+ HarnessStateTracker threadState = new HarnessStateTracker();
+ threadState.setup(state);
+ new ExecutionStateSamplerBenchmark().testTinyBundleHarnessStateSampler(threadState, blackhole);
state.tearDown();
+ threadState.tearDown();
}
@Test
public void testLargeBundleHarnessStateSampler() throws Exception {
HarnessStateSampler state = new HarnessStateSampler();
- new ExecutionStateSamplerBenchmark().testLargeBundleHarnessStateSampler(state);
+ HarnessStateTracker threadState = new HarnessStateTracker();
+ threadState.setup(state);
+ new ExecutionStateSamplerBenchmark().testLargeBundleHarnessStateSampler(threadState, blackhole);
state.tearDown();
+ threadState.tearDown();
}
}