You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2022/07/14 22:24:09 UTC

[beam] branch master updated: Remove locks around ExecutionStateSampler (#22190)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 64bcc7df2b8 Remove locks around ExecutionStateSampler (#22190)
64bcc7df2b8 is described below

commit 64bcc7df2b86b9a531591a5f907e245f1af7998d
Author: Steven Niemitz <st...@gmail.com>
AuthorDate: Thu Jul 14 18:24:01 2022 -0400

    Remove locks around ExecutionStateSampler (#22190)
    
    * remove locks around ExecutionStateSampler
    
    * make all benchmarks use 10 threads
    
    * normalize benchmarks
    
    * more state
---
 .../core/metrics/ExecutionStateSampler.java        |  17 ++-
 .../core/metrics/ExecutionStateTracker.java        |  18 +++-
 .../dataflow/worker/DataflowExecutionContext.java  |   4 +-
 .../control/ExecutionStateSamplerBenchmark.java    | 118 ++++++++++++++-------
 .../ExecutionStateSamplerBenchmarkTest.java        |  33 +++++-
 5 files changed, 131 insertions(+), 59 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
index 2d39b377b57..6689d3ca31d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java
@@ -20,15 +20,15 @@ package org.apache.beam.runners.core.metrics;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
 
 import java.io.Closeable;
-import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import javax.annotation.concurrent.GuardedBy;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -37,10 +37,7 @@ import org.joda.time.DateTimeUtils.MillisProvider;
 /** Monitors the execution of one or more execution threads. */
 public class ExecutionStateSampler {
 
-  // We use a synchronized data structure (as opposed to a concurrent one) since synchronization
-  // is necessary to prevent races between tracker removal and the sampling thread iteration.
-  @GuardedBy("this")
-  private final HashSet<ExecutionStateTracker> activeTrackers = new HashSet<>();
+  private final Set<ExecutionStateTracker> activeTrackers = ConcurrentHashMap.newKeySet();
 
   private static final MillisProvider SYSTEM_MILLIS_PROVIDER = System::currentTimeMillis;
 
@@ -150,15 +147,13 @@ public class ExecutionStateSampler {
   }
 
   /** Add the tracker to the sampling set. */
-  synchronized void addTracker(ExecutionStateTracker tracker) {
+  void addTracker(ExecutionStateTracker tracker) {
     this.activeTrackers.add(tracker);
   }
 
   /** Remove the tracker from the sampling set. */
   void removeTracker(ExecutionStateTracker tracker) {
-    synchronized (this) {
-      activeTrackers.remove(tracker);
-    }
+    activeTrackers.remove(tracker);
 
     // Attribute any remaining time since the last sampling while removing the tracker.
     //
@@ -174,7 +169,7 @@ public class ExecutionStateSampler {
 
   /** Attributing sampling time to trackers. */
   @VisibleForTesting
-  public synchronized void doSampling(long millisSinceLastSample) {
+  public void doSampling(long millisSinceLastSample) {
     for (ExecutionStateTracker tracker : activeTrackers) {
       tracker.takeSample(millisSinceLastSample);
     }
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
index 969e9339bef..4575ddb6275 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateTracker.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -45,6 +46,8 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
       new ConcurrentHashMap<>();
 
   private static final long LULL_REPORT_MS = TimeUnit.MINUTES.toMillis(5);
+  private static final AtomicIntegerFieldUpdater<ExecutionStateTracker> SAMPLING_UPDATER =
+      AtomicIntegerFieldUpdater.newUpdater(ExecutionStateTracker.class, "sampling");
 
   public static final String START_STATE_NAME = "start";
   public static final String PROCESS_STATE_NAME = "process";
@@ -117,6 +120,9 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
    */
   private volatile @Nullable ExecutionState currentState;
 
+  @SuppressWarnings("UnusedVariable")
+  private volatile int sampling = 0;
+
   /**
    * The current number of times that this {@link ExecutionStateTracker} has transitioned state.
    *
@@ -298,7 +304,17 @@ public class ExecutionStateTracker implements Comparable<ExecutionStateTracker>
     return nextLullReportMs;
   }
 
-  protected void takeSample(long millisSinceLastSample) {
+  void takeSample(long millisSinceLastSample) {
+    if (SAMPLING_UPDATER.compareAndSet(this, 0, 1)) {
+      try {
+        takeSampleOnce(millisSinceLastSample);
+      } finally {
+        SAMPLING_UPDATER.set(this, 0);
+      }
+    }
+  }
+
+  protected void takeSampleOnce(long millisSinceLastSample) {
     // These variables are read by Sampler thread, and written by Execution and Progress Reporting
     // threads.
     // Because there is no read/modify/write cycle in the Sampler thread, making them volatile
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
index 0d8c270d6a8..d02637c7efa 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java
@@ -282,9 +282,9 @@ public abstract class DataflowExecutionContext<T extends DataflowStepContext> {
     }
 
     @Override
-    protected void takeSample(long millisSinceLastSample) {
+    protected void takeSampleOnce(long millisSinceLastSample) {
       elementExecutionTracker.takeSample(millisSinceLastSample);
-      super.takeSample(millisSinceLastSample);
+      super.takeSampleOnce(millisSinceLastSample);
     }
 
     @Override
diff --git a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java
index 0a15302f009..b4de9b87dc0 100644
--- a/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java
+++ b/sdks/java/harness/jmh/src/main/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmark.java
@@ -35,15 +35,16 @@ import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.infra.Blackhole;
 
 /** Benchmarks for sampling execution state. */
 public class ExecutionStateSamplerBenchmark {
   private static final String PTRANSFORM = "benchmarkPTransform";
 
-  @State(Scope.Benchmark)
-  public static class RunnersCoreStateSampler {
-    public final ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
-    public final ExecutionStateTracker tracker = new ExecutionStateTracker(sampler);
+  @State(Scope.Thread)
+  public static class RunnersCoreStateTracker {
+    public ExecutionStateTracker tracker;
+
     public final SimpleExecutionState state1 =
         new SimpleExecutionState(
             "process",
@@ -60,14 +61,13 @@ public class ExecutionStateSamplerBenchmark {
             Urns.PROCESS_BUNDLE_MSECS,
             new HashMap<>(Collections.singletonMap(Labels.PTRANSFORM, PTRANSFORM)));
 
-    @Setup(Level.Trial)
-    public void setup() {
-      sampler.start();
+    @Setup
+    public void setup(RunnersCoreStateSampler sharedState) {
+      tracker = new ExecutionStateTracker(sharedState.sampler);
     }
 
-    @TearDown(Level.Trial)
+    @TearDown
     public void tearDown() {
-      sampler.stop();
       // Print out the total millis so that JVM doesn't optimize code away.
       System.out.println(
           state1.getTotalMillis()
@@ -78,51 +78,82 @@ public class ExecutionStateSamplerBenchmark {
     }
   }
 
+  @State(Scope.Benchmark)
+  public static class RunnersCoreStateSampler {
+    public final ExecutionStateSampler sampler = ExecutionStateSampler.newForTest();
+
+    @Setup(Level.Trial)
+    public void setup() {
+      sampler.start();
+    }
+
+    @TearDown(Level.Trial)
+    public void tearDown() {
+      sampler.stop();
+    }
+  }
+
+  @State(Scope.Thread)
+  public static class HarnessStateTracker {
+    public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker tracker;
+
+    public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state1;
+    public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state2;
+    public org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state3;
+
+    @Setup
+    public void setup(HarnessStateSampler sharedState) {
+      tracker = sharedState.sampler.create();
+      state1 = tracker.create("1", PTRANSFORM, PTRANSFORM + "Name", "1");
+      state2 = tracker.create("2", PTRANSFORM, PTRANSFORM + "Name", "2");
+      state3 = tracker.create("3", PTRANSFORM, PTRANSFORM + "Name", "3");
+    }
+
+    @TearDown
+    public void tearDown() {
+      Map<String, ByteString> monitoringData = new HashMap<>();
+      // Print out the total millis so that JVM doesn't optimize code away.
+      tracker.updateFinalMonitoringData(monitoringData);
+      System.out.println(monitoringData);
+    }
+  }
+
   @State(Scope.Benchmark)
   public static class HarnessStateSampler {
     public final org.apache.beam.fn.harness.control.ExecutionStateSampler sampler =
         new org.apache.beam.fn.harness.control.ExecutionStateSampler(
             PipelineOptionsFactory.create(), System::currentTimeMillis);
-    public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionStateTracker
-        tracker = sampler.create();
-    public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state1 =
-        tracker.create("1", PTRANSFORM, PTRANSFORM + "Name", "1");
-    public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state2 =
-        tracker.create("2", PTRANSFORM, PTRANSFORM + "Name", "2");
-    public final org.apache.beam.fn.harness.control.ExecutionStateSampler.ExecutionState state3 =
-        tracker.create("3", PTRANSFORM, PTRANSFORM + "Name", "3");
 
     @TearDown(Level.Trial)
     public void tearDown() {
       sampler.stop();
-      Map<String, ByteString> monitoringData = new HashMap<>();
-      tracker.updateFinalMonitoringData(monitoringData);
-      // Print out the total millis so that JVM doesn't optimize code away.
-      System.out.println(monitoringData);
     }
   }
 
   @Benchmark
-  @Threads(1)
-  public void testTinyBundleRunnersCoreStateSampler(RunnersCoreStateSampler state)
-      throws Exception {
-    state.tracker.activate();
+  @Threads(512)
+  public void testTinyBundleRunnersCoreStateSampler(
+      RunnersCoreStateTracker trackerState, Blackhole bh) throws Exception {
+    ExecutionStateTracker tracker = trackerState.tracker;
+    Closeable c = tracker.activate();
     for (int i = 0; i < 3; ) {
-      Closeable close1 = state.tracker.enterState(state.state1);
-      Closeable close2 = state.tracker.enterState(state.state2);
-      Closeable close3 = state.tracker.enterState(state.state3);
+      Closeable close1 = tracker.enterState(trackerState.state1);
+      Closeable close2 = tracker.enterState(trackerState.state2);
+      Closeable close3 = tracker.enterState(trackerState.state3);
       // trival code that is being sampled for this state
       i += 1;
+      bh.consume(i);
       close3.close();
       close2.close();
       close1.close();
     }
-    state.tracker.reset();
+    c.close();
   }
 
   @Benchmark
-  @Threads(1)
-  public void testTinyBundleHarnessStateSampler(HarnessStateSampler state) throws Exception {
+  @Threads(512)
+  public void testTinyBundleHarnessStateSampler(HarnessStateTracker state, Blackhole bh)
+      throws Exception {
     state.tracker.start("processBundleId");
     for (int i = 0; i < 3; ) {
       state.state1.activate();
@@ -130,6 +161,7 @@ public class ExecutionStateSamplerBenchmark {
       state.state3.activate();
       // trival code that is being sampled for this state
       i += 1;
+      bh.consume(i);
       state.state3.deactivate();
       state.state2.deactivate();
       state.state1.deactivate();
@@ -138,26 +170,29 @@ public class ExecutionStateSamplerBenchmark {
   }
 
   @Benchmark
-  @Threads(1)
-  public void testLargeBundleRunnersCoreStateSampler(RunnersCoreStateSampler state)
-      throws Exception {
-    state.tracker.activate();
+  @Threads(16)
+  public void testLargeBundleRunnersCoreStateSampler(
+      RunnersCoreStateTracker trackerState, Blackhole bh) throws Exception {
+    ExecutionStateTracker tracker = trackerState.tracker;
+    Closeable c = tracker.activate();
     for (int i = 0; i < 1000; ) {
-      Closeable close1 = state.tracker.enterState(state.state1);
-      Closeable close2 = state.tracker.enterState(state.state2);
-      Closeable close3 = state.tracker.enterState(state.state3);
+      Closeable close1 = tracker.enterState(trackerState.state1);
+      Closeable close2 = tracker.enterState(trackerState.state2);
+      Closeable close3 = tracker.enterState(trackerState.state3);
       // trival code that is being sampled for this state
       i += 1;
+      bh.consume(i);
       close3.close();
       close2.close();
       close1.close();
     }
-    state.tracker.reset();
+    c.close();
   }
 
   @Benchmark
-  @Threads(1)
-  public void testLargeBundleHarnessStateSampler(HarnessStateSampler state) throws Exception {
+  @Threads(16)
+  public void testLargeBundleHarnessStateSampler(HarnessStateTracker state, Blackhole bh)
+      throws Exception {
     state.tracker.start("processBundleId");
     for (int i = 0; i < 1000; ) {
       state.state1.activate();
@@ -165,6 +200,7 @@ public class ExecutionStateSamplerBenchmark {
       state.state3.activate();
       // trival code that is being sampled for this state
       i += 1;
+      bh.consume(i);
       state.state3.deactivate();
       state.state2.deactivate();
       state.state1.deactivate();
diff --git a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java
index 3e735d9b59c..80564ac8922 100644
--- a/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java
+++ b/sdks/java/harness/jmh/src/test/java/org/apache/beam/fn/harness/jmh/control/ExecutionStateSamplerBenchmarkTest.java
@@ -18,41 +18,66 @@
 package org.apache.beam.fn.harness.jmh.control;
 
 import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.HarnessStateSampler;
+import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.HarnessStateTracker;
 import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.RunnersCoreStateSampler;
+import org.apache.beam.fn.harness.jmh.control.ExecutionStateSamplerBenchmark.RunnersCoreStateTracker;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.openjdk.jmh.infra.Blackhole;
 
 /** Tests for {@link ExecutionStateSamplerBenchmark}. */
 @RunWith(JUnit4.class)
 public class ExecutionStateSamplerBenchmarkTest {
+  private Blackhole blackhole;
+
+  @Before
+  public void before() {
+    blackhole =
+        new Blackhole(
+            "Today's password is swordfish. I understand instantiating Blackholes directly is dangerous.");
+  }
+
   @Test
   public void testTinyBundleRunnersCoreStateSampler() throws Exception {
     RunnersCoreStateSampler state = new RunnersCoreStateSampler();
+    RunnersCoreStateTracker threadState = new RunnersCoreStateTracker();
     state.setup();
-    new ExecutionStateSamplerBenchmark().testTinyBundleRunnersCoreStateSampler(state);
+    threadState.setup(state);
+    new ExecutionStateSamplerBenchmark()
+        .testTinyBundleRunnersCoreStateSampler(threadState, blackhole);
     state.tearDown();
   }
 
   @Test
   public void testLargeBundleRunnersCoreStateSampler() throws Exception {
     RunnersCoreStateSampler state = new RunnersCoreStateSampler();
+    RunnersCoreStateTracker threadState = new RunnersCoreStateTracker();
     state.setup();
-    new ExecutionStateSamplerBenchmark().testLargeBundleRunnersCoreStateSampler(state);
+    threadState.setup(state);
+    new ExecutionStateSamplerBenchmark()
+        .testLargeBundleRunnersCoreStateSampler(threadState, blackhole);
     state.tearDown();
   }
 
   @Test
   public void testTinyBundleHarnessStateSampler() throws Exception {
     HarnessStateSampler state = new HarnessStateSampler();
-    new ExecutionStateSamplerBenchmark().testTinyBundleHarnessStateSampler(state);
+    HarnessStateTracker threadState = new HarnessStateTracker();
+    threadState.setup(state);
+    new ExecutionStateSamplerBenchmark().testTinyBundleHarnessStateSampler(threadState, blackhole);
     state.tearDown();
+    threadState.tearDown();
   }
 
   @Test
   public void testLargeBundleHarnessStateSampler() throws Exception {
     HarnessStateSampler state = new HarnessStateSampler();
-    new ExecutionStateSamplerBenchmark().testLargeBundleHarnessStateSampler(state);
+    HarnessStateTracker threadState = new HarnessStateTracker();
+    threadState.setup(state);
+    new ExecutionStateSamplerBenchmark().testLargeBundleHarnessStateSampler(threadState, blackhole);
     state.tearDown();
+    threadState.tearDown();
   }
 }