You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2023/04/13 21:40:44 UTC

[beam] branch master updated: Extract BundleManager to an Interface in SamzaRunner (#26268)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a9ab685d9b Extract BundleManager to an Interface in SamzaRunner (#26268)
5a9ab685d9b is described below

commit 5a9ab685d9bb7a1df901f535cfb7b5c106fbf927
Author: Katie Liu <ka...@linkedin.com>
AuthorDate: Thu Apr 13 14:40:36 2023 -0700

    Extract BundleManager to an Interface in SamzaRunner (#26268)
---
 .../beam/runners/samza/runtime/BundleManager.java  | 313 +--------------------
 ...undleManager.java => ClassicBundleManager.java} |  39 +--
 .../apache/beam/runners/samza/runtime/DoFnOp.java  |   2 +-
 ...agerTest.java => ClassicBundleManagerTest.java} |  16 +-
 4 files changed, 29 insertions(+), 341 deletions(-)

diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
index 873c5f3005f..1c89f770952 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
@@ -17,317 +17,18 @@
  */
 package org.apache.beam.runners.samza.runtime;
 
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
-import org.apache.samza.operators.Scheduler;
-import org.joda.time.Duration;
 import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-/**
- * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a
- * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at
- * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous
- * bundles have completed.
- *
- * <p>A bundle is considered complete only when the outputs corresponding to each element in the
- * bundle have been resolved and the watermark associated with the bundle(if any) is propagated
- * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
- * In case of synchronous ParDo, outputs of the element is resolved immediately after the
- * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
- * all the future emitted by the processElement is resolved.
- *
- * <p>This class is not thread safe and the current implementation relies on the assumption that
- * messages are dispatched to BundleManager in a single threaded mode.
- *
- * @param <OutT> output type of the {@link DoFnOp}
- */
-@SuppressWarnings({
-  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-public class BundleManager<OutT> {
-  private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
-  private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
-
-  private final long maxBundleSize;
-  private final long maxBundleTimeMs;
-  private final BundleProgressListener<OutT> bundleProgressListener;
-  private final FutureCollector<OutT> futureCollector;
-  private final Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
-  private final String bundleCheckTimerId;
-
-  // Number elements belonging to the current active bundle
-  private transient AtomicLong currentBundleElementCount;
-  // Number of bundles that are in progress but not yet finished
-  private transient AtomicLong pendingBundleCount;
-  // Denotes the start time of the current active bundle
-  private transient AtomicLong bundleStartTime;
-  // Denotes if there is an active in progress bundle. Note at a given time, we can have multiple
-  // bundle in progress.
-  // This flag denotes if there is a bundle that is current and hasn't been closed.
-  private transient AtomicBoolean isBundleStarted;
-  // Holder for watermark which gets propagated when the bundle is finished.
-  private transient Instant bundleWatermarkHold;
-  // A future that is completed once all futures belonging to the current active bundle are
-  // completed.  The value is null if there are no futures in the current active bundle.
-  private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
-  private transient CompletionStage<Void> watermarkFuture;
-
-  public BundleManager(
-      BundleProgressListener<OutT> bundleProgressListener,
-      FutureCollector<OutT> futureCollector,
-      long maxBundleSize,
-      long maxBundleTimeMs,
-      Scheduler<KeyedTimerData<Void>> bundleTimerScheduler,
-      String bundleCheckTimerId) {
-    this.maxBundleSize = maxBundleSize;
-    this.maxBundleTimeMs = maxBundleTimeMs;
-    this.bundleProgressListener = bundleProgressListener;
-    this.bundleTimerScheduler = bundleTimerScheduler;
-    this.bundleCheckTimerId = bundleCheckTimerId;
-    this.futureCollector = futureCollector;
-
-    if (maxBundleSize > 1) {
-      scheduleNextBundleCheck();
-    }
-
-    // instance variable initialization for bundle tracking
-    this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
-    this.currentActiveBundleDoneFutureReference = new AtomicReference<>();
-    this.currentBundleElementCount = new AtomicLong(0L);
-    this.isBundleStarted = new AtomicBoolean(false);
-    this.pendingBundleCount = new AtomicLong(0L);
-    this.watermarkFuture = CompletableFuture.completedFuture(null);
-  }
-
-  /*
-   * Schedule in processing time to check whether the current bundle should be closed. Note that
-   * we only approximately achieve max bundle time by checking as frequent as half of the max bundle
-   * time set by users. This would violate the max bundle time by up to half of it but should
-   * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle).
-   */
-  private void scheduleNextBundleCheck() {
-    final Instant nextBundleCheckTime =
-        Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS));
-    final TimerInternals.TimerData timerData =
-        TimerInternals.TimerData.of(
-            this.bundleCheckTimerId,
-            StateNamespaces.global(),
-            nextBundleCheckTime,
-            nextBundleCheckTime,
-            TimeDomain.PROCESSING_TIME);
-    bundleTimerScheduler.schedule(
-        new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
-  }
-
-  void tryStartBundle() {
-    futureCollector.prepare();
-
-    if (isBundleStarted.compareAndSet(false, true)) {
-      LOG.debug("Starting a new bundle.");
-      // make sure the previous bundle is sealed and futures are cleared
-      Preconditions.checkArgument(
-          currentActiveBundleDoneFutureReference.get() == null,
-          "Current active bundle done future should be null before starting a new bundle.");
-      bundleStartTime.set(System.currentTimeMillis());
-      pendingBundleCount.incrementAndGet();
-      bundleProgressListener.onBundleStarted();
-    }
-
-    currentBundleElementCount.incrementAndGet();
-  }
-
-  void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
-    // propagate watermark immediately if no bundle is in progress and all the previous bundles have
-    // completed.
-    if (!isBundleStarted() && pendingBundleCount.get() == 0) {
-      LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark);
-      bundleProgressListener.onWatermark(watermark, emitter);
-      return;
-    }
-
-    // hold back the watermark since there is either a bundle in progress or previously closed
-    // bundles are unfinished.
-    this.bundleWatermarkHold = watermark;
-
-    // for batch mode, the max watermark should force the bundle to close
-    if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
-      /*
-       * Due to lack of async watermark function, we block on the previous watermark futures before propagating the watermark
-       * downstream. If a bundle is in progress tryFinishBundle() fill force the bundle to close and emit watermark.
-       * If no bundle in progress, we progress watermark explicitly after the completion of previous watermark futures.
-       */
-      if (isBundleStarted()) {
-        LOG.info(
-            "Received max watermark. Triggering finish bundle before flushing the watermark downstream.");
-        tryFinishBundle(emitter);
-        watermarkFuture.toCompletableFuture().join();
-      } else {
-        LOG.info(
-            "Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream.");
-        watermarkFuture.toCompletableFuture().join();
-        bundleProgressListener.onWatermark(watermark, emitter);
-      }
-    }
-  }
-
-  void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
-    // this is internal timer in processing time to check whether a bundle should be closed
-    if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
-      tryFinishBundle(emitter);
-      scheduleNextBundleCheck();
-    }
-  }
-
-  /**
-   * Signal the bundle manager to handle failure. We discard the output collected as part of
-   * processing the current element and reset the bundle count.
-   *
-   * @param t failure cause
-   */
-  void signalFailure(Throwable t) {
-    LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
-    futureCollector.discard();
-    // reset the bundle start flag only if the bundle has started
-    isBundleStarted.compareAndSet(true, false);
+public interface BundleManager<OutT> {
+  void tryStartBundle();
 
-    // bundle start may not necessarily mean we have actually started the bundle since some of the
-    // invariant check conditions within bundle start could throw exceptions. so rely on bundle
-    // start time
-    if (bundleStartTime.get() != Long.MAX_VALUE) {
-      currentBundleElementCount.set(0L);
-      bundleStartTime.set(Long.MAX_VALUE);
-      pendingBundleCount.decrementAndGet();
-      currentActiveBundleDoneFutureReference.set(null);
-    }
-  }
-
-  void tryFinishBundle(OpEmitter<OutT> emitter) {
-
-    // we need to seal the output for each element within a bundle irrespective of the whether we
-    // decide to finish the
-    // bundle or not
-    CompletionStage<Collection<WindowedValue<OutT>>> outputFuture = futureCollector.finish();
-
-    if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) {
-      LOG.debug("Finishing the current bundle.");
-
-      // reset the bundle count
-      // seal the bundle and emit the result future (collection of results)
-      // chain the finish bundle invocation on the finish bundle
-      currentBundleElementCount.set(0L);
-      bundleStartTime.set(Long.MAX_VALUE);
-      Instant watermarkHold = bundleWatermarkHold;
-      bundleWatermarkHold = null;
-
-      CompletionStage<Void> currentActiveBundleDoneFuture =
-          currentActiveBundleDoneFutureReference.get();
-      outputFuture =
-          outputFuture.thenCombine(
-              currentActiveBundleDoneFuture != null
-                  ? currentActiveBundleDoneFuture
-                  : CompletableFuture.completedFuture(null),
-              (res, ignored) -> {
-                bundleProgressListener.onBundleFinished(emitter);
-                return res;
-              });
-
-      BiConsumer<Collection<WindowedValue<OutT>>, Void> watermarkPropagationFn;
-      if (watermarkHold == null) {
-        watermarkPropagationFn = (ignored, res) -> pendingBundleCount.decrementAndGet();
-      } else {
-        watermarkPropagationFn =
-            (ignored, res) -> {
-              LOG.debug("Propagating watermark: {} to downstream.", watermarkHold);
-              bundleProgressListener.onWatermark(watermarkHold, emitter);
-              pendingBundleCount.decrementAndGet();
-            };
-      }
+  void processWatermark(Instant watermark, OpEmitter<OutT> emitter);
 
-      // We chain the current watermark emission with previous watermark and the output futures
-      // since bundles can finish out of order but we still want the watermark to be emitted in
-      // order.
-      watermarkFuture = outputFuture.thenAcceptBoth(watermarkFuture, watermarkPropagationFn);
-      currentActiveBundleDoneFutureReference.set(null);
-    } else if (isBundleStarted.get()) {
-      final CompletableFuture<Collection<WindowedValue<OutT>>> finalOutputFuture =
-          outputFuture.toCompletableFuture();
-      currentActiveBundleDoneFutureReference.updateAndGet(
-          maybePrevFuture -> {
-            CompletableFuture<Void> prevFuture =
-                maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null);
+  void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter);
 
-            return CompletableFuture.allOf(prevFuture, finalOutputFuture);
-          });
-    }
+  void signalFailure(Throwable t);
 
-    // emit the future to the propagate it to rest of the DAG
-    emitter.emitFuture(outputFuture);
-  }
-
-  @VisibleForTesting
-  long getCurrentBundleElementCount() {
-    return currentBundleElementCount.longValue();
-  }
-
-  @VisibleForTesting
-  @Nullable
-  CompletionStage<Void> getCurrentBundleDoneFuture() {
-    return currentActiveBundleDoneFutureReference.get();
-  }
-
-  @VisibleForTesting
-  void setCurrentBundleDoneFuture(CompletableFuture<Void> currentBundleResultFuture) {
-    this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture);
-  }
-
-  @VisibleForTesting
-  long getPendingBundleCount() {
-    return pendingBundleCount.longValue();
-  }
-
-  @VisibleForTesting
-  void setPendingBundleCount(long value) {
-    pendingBundleCount.set(value);
-  }
-
-  @VisibleForTesting
-  boolean isBundleStarted() {
-    return isBundleStarted.get();
-  }
-
-  @VisibleForTesting
-  void setBundleWatermarkHold(Instant watermark) {
-    this.bundleWatermarkHold = watermark;
-  }
-
-  /**
-   * We close the current bundle in progress if one of the following criteria is met 1. The bundle
-   * count &ge; maxBundleSize 2. Time elapsed since the bundle started is &ge; maxBundleTimeMs 3.
-   * Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs
-   *
-   * @return true - if one of the criteria above is satisfied; false - otherwise
-   */
-  private boolean shouldFinishBundle() {
-    return isBundleStarted.get()
-        && (currentBundleElementCount.get() >= maxBundleSize
-            || System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs
-            || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold));
-  }
+  void tryFinishBundle(OpEmitter<OutT> emitter);
 
   /**
    * A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle
@@ -339,7 +40,7 @@ public class BundleManager<OutT> {
    *
    * @param <OutT>
    */
-  public interface BundleProgressListener<OutT> {
+  interface BundleProgressListener<OutT> {
     void onBundleStarted();
 
     void onBundleFinished(OpEmitter<OutT> emitter);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java
similarity index 92%
copy from runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
copy to runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java
index 873c5f3005f..4a90e468824 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings({
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class BundleManager<OutT> {
-  private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
+public class ClassicBundleManager<OutT> implements BundleManager<OutT> {
+  private static final Logger LOG = LoggerFactory.getLogger(ClassicBundleManager.class);
   private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
 
   private final long maxBundleSize;
@@ -87,7 +87,7 @@ public class BundleManager<OutT> {
   private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
   private transient CompletionStage<Void> watermarkFuture;
 
-  public BundleManager(
+  public ClassicBundleManager(
       BundleProgressListener<OutT> bundleProgressListener,
       FutureCollector<OutT> futureCollector,
       long maxBundleSize,
@@ -134,7 +134,8 @@ public class BundleManager<OutT> {
         new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
   }
 
-  void tryStartBundle() {
+  @Override
+  public void tryStartBundle() {
     futureCollector.prepare();
 
     if (isBundleStarted.compareAndSet(false, true)) {
@@ -151,7 +152,8 @@ public class BundleManager<OutT> {
     currentBundleElementCount.incrementAndGet();
   }
 
-  void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+  @Override
+  public void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
     // propagate watermark immediately if no bundle is in progress and all the previous bundles have
     // completed.
     if (!isBundleStarted() && pendingBundleCount.get() == 0) {
@@ -185,7 +187,8 @@ public class BundleManager<OutT> {
     }
   }
 
-  void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
+  @Override
+  public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
     // this is internal timer in processing time to check whether a bundle should be closed
     if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
       tryFinishBundle(emitter);
@@ -199,7 +202,8 @@ public class BundleManager<OutT> {
    *
    * @param t failure cause
    */
-  void signalFailure(Throwable t) {
+  @Override
+  public void signalFailure(Throwable t) {
     LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
     futureCollector.discard();
     // reset the bundle start flag only if the bundle has started
@@ -216,7 +220,8 @@ public class BundleManager<OutT> {
     }
   }
 
-  void tryFinishBundle(OpEmitter<OutT> emitter) {
+  @Override
+  public void tryFinishBundle(OpEmitter<OutT> emitter) {
 
     // we need to seal the output for each element within a bundle irrespective of the whether we
     // decide to finish the
@@ -328,22 +333,4 @@ public class BundleManager<OutT> {
             || System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs
             || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold));
   }
-
-  /**
-   * A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle
-   * consists of 1. Start bundle - Invoked when the bundle is started 2. Finish bundle - Invoked
-   * when the bundle is complete. Refer to the docs under {@link BundleManager} for definition on
-   * when a bundle is considered complete. 3. onWatermark - Invoked when watermark is ready to be
-   * propagated to downstream DAG. Refer to the docs under {@link BundleManager} on when watermark
-   * is held vs propagated.
-   *
-   * @param <OutT>
-   */
-  public interface BundleProgressListener<OutT> {
-    void onBundleStarted();
-
-    void onBundleFinished(OpEmitter<OutT> emitter);
-
-    void onWatermark(Instant watermark, OpEmitter<OutT> emitter);
-  }
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 35661ae86fe..c693754b5b9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -200,7 +200,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
     final FutureCollector<OutT> outputFutureCollector = createFutureCollector();
 
     this.bundleManager =
-        new BundleManager<>(
+        new ClassicBundleManager<>(
             createBundleProgressListener(),
             outputFutureCollector,
             samzaPipelineOptions.getMaxBundleSize(),
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
similarity index 97%
rename from runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
rename to runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
index 91422097e83..f8f30a8d2f8 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
@@ -42,24 +42,24 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-/** Unit tests for {@linkplain BundleManager}. */
-public final class BundleManagerTest {
+/** Unit tests for {@linkplain ClassicBundleManager}. */
+public final class ClassicBundleManagerTest {
   private static final long MAX_BUNDLE_SIZE = 3;
   private static final long MAX_BUNDLE_TIME_MS = 2000;
   private static final String BUNDLE_CHECK_TIMER_ID = "bundle-check-test-timer";
 
   private FutureCollector<String> mockFutureCollector;
-  private BundleManager<String> bundleManager;
-  private BundleManager.BundleProgressListener<String> bundleProgressListener;
+  private ClassicBundleManager<String> bundleManager;
+  private ClassicBundleManager.BundleProgressListener<String> bundleProgressListener;
   private Scheduler<KeyedTimerData<Void>> mockScheduler;
 
   @Before
   public void setUp() {
     mockFutureCollector = mock(FutureCollector.class);
-    bundleProgressListener = mock(BundleManager.BundleProgressListener.class);
+    bundleProgressListener = mock(ClassicBundleManager.BundleProgressListener.class);
     mockScheduler = mock(Scheduler.class);
     bundleManager =
-        new BundleManager<>(
+        new ClassicBundleManager<>(
             bundleProgressListener,
             mockFutureCollector,
             MAX_BUNDLE_SIZE,
@@ -307,8 +307,8 @@ public final class BundleManagerTest {
 
   @Test
   public void testProcessTimerWithBundleTimeElapsed() {
-    BundleManager<String> bundleManager =
-        new BundleManager<>(
+    ClassicBundleManager<String> bundleManager =
+        new ClassicBundleManager<>(
             bundleProgressListener,
             mockFutureCollector,
             MAX_BUNDLE_SIZE,