You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xi...@apache.org on 2023/04/13 21:40:44 UTC
[beam] branch master updated: Extract BundleManager to an Interface in SamzaRunner (#26268)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5a9ab685d9b Extract BundleManager to an Interface in SamzaRunner (#26268)
5a9ab685d9b is described below
commit 5a9ab685d9bb7a1df901f535cfb7b5c106fbf927
Author: Katie Liu <ka...@linkedin.com>
AuthorDate: Thu Apr 13 14:40:36 2023 -0700
Extract BundleManager to an Interface in SamzaRunner (#26268)
---
.../beam/runners/samza/runtime/BundleManager.java | 313 +--------------------
...undleManager.java => ClassicBundleManager.java} | 39 +--
.../apache/beam/runners/samza/runtime/DoFnOp.java | 2 +-
...agerTest.java => ClassicBundleManagerTest.java} | 16 +-
4 files changed, 29 insertions(+), 341 deletions(-)
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
index 873c5f3005f..1c89f770952 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
@@ -17,317 +17,18 @@
*/
package org.apache.beam.runners.samza.runtime;
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.BiConsumer;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.StateNamespaces;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
-import org.apache.samza.operators.Scheduler;
-import org.joda.time.Duration;
import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-/**
- * Bundle management for the {@link DoFnOp} that handles lifecycle of a bundle. It also serves as a
- * proxy for the {@link DoFnOp} to process watermark and decides to 1. Hold watermark if there is at
- * least one bundle in progress. 2. Propagates the watermark to downstream DAG, if all the previous
- * bundles have completed.
- *
- * <p>A bundle is considered complete only when the outputs corresponding to each element in the
- * bundle have been resolved and the watermark associated with the bundle(if any) is propagated
- * downstream. The output of an element is considered resolved based on the nature of the ParDoFn 1.
- * In case of synchronous ParDo, outputs of the element is resolved immediately after the
- * processElement returns. 2. In case of asynchronous ParDo, outputs of the element is resolved when
- * all the future emitted by the processElement is resolved.
- *
- * <p>This class is not thread safe and the current implementation relies on the assumption that
- * messages are dispatched to BundleManager in a single threaded mode.
- *
- * @param <OutT> output type of the {@link DoFnOp}
- */
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-public class BundleManager<OutT> {
- private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
- private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
-
- private final long maxBundleSize;
- private final long maxBundleTimeMs;
- private final BundleProgressListener<OutT> bundleProgressListener;
- private final FutureCollector<OutT> futureCollector;
- private final Scheduler<KeyedTimerData<Void>> bundleTimerScheduler;
- private final String bundleCheckTimerId;
-
- // Number elements belonging to the current active bundle
- private transient AtomicLong currentBundleElementCount;
- // Number of bundles that are in progress but not yet finished
- private transient AtomicLong pendingBundleCount;
- // Denotes the start time of the current active bundle
- private transient AtomicLong bundleStartTime;
- // Denotes if there is an active in progress bundle. Note at a given time, we can have multiple
- // bundle in progress.
- // This flag denotes if there is a bundle that is current and hasn't been closed.
- private transient AtomicBoolean isBundleStarted;
- // Holder for watermark which gets propagated when the bundle is finished.
- private transient Instant bundleWatermarkHold;
- // A future that is completed once all futures belonging to the current active bundle are
- // completed. The value is null if there are no futures in the current active bundle.
- private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
- private transient CompletionStage<Void> watermarkFuture;
-
- public BundleManager(
- BundleProgressListener<OutT> bundleProgressListener,
- FutureCollector<OutT> futureCollector,
- long maxBundleSize,
- long maxBundleTimeMs,
- Scheduler<KeyedTimerData<Void>> bundleTimerScheduler,
- String bundleCheckTimerId) {
- this.maxBundleSize = maxBundleSize;
- this.maxBundleTimeMs = maxBundleTimeMs;
- this.bundleProgressListener = bundleProgressListener;
- this.bundleTimerScheduler = bundleTimerScheduler;
- this.bundleCheckTimerId = bundleCheckTimerId;
- this.futureCollector = futureCollector;
-
- if (maxBundleSize > 1) {
- scheduleNextBundleCheck();
- }
-
- // instance variable initialization for bundle tracking
- this.bundleStartTime = new AtomicLong(Long.MAX_VALUE);
- this.currentActiveBundleDoneFutureReference = new AtomicReference<>();
- this.currentBundleElementCount = new AtomicLong(0L);
- this.isBundleStarted = new AtomicBoolean(false);
- this.pendingBundleCount = new AtomicLong(0L);
- this.watermarkFuture = CompletableFuture.completedFuture(null);
- }
-
- /*
- * Schedule in processing time to check whether the current bundle should be closed. Note that
- * we only approximately achieve max bundle time by checking as frequent as half of the max bundle
- * time set by users. This would violate the max bundle time by up to half of it but should
- * acceptable in most cases (and cheaper than scheduling a timer at the beginning of every bundle).
- */
- private void scheduleNextBundleCheck() {
- final Instant nextBundleCheckTime =
- Instant.now().plus(Duration.millis(maxBundleTimeMs / 2 + MIN_BUNDLE_CHECK_TIME_MS));
- final TimerInternals.TimerData timerData =
- TimerInternals.TimerData.of(
- this.bundleCheckTimerId,
- StateNamespaces.global(),
- nextBundleCheckTime,
- nextBundleCheckTime,
- TimeDomain.PROCESSING_TIME);
- bundleTimerScheduler.schedule(
- new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
- }
-
- void tryStartBundle() {
- futureCollector.prepare();
-
- if (isBundleStarted.compareAndSet(false, true)) {
- LOG.debug("Starting a new bundle.");
- // make sure the previous bundle is sealed and futures are cleared
- Preconditions.checkArgument(
- currentActiveBundleDoneFutureReference.get() == null,
- "Current active bundle done future should be null before starting a new bundle.");
- bundleStartTime.set(System.currentTimeMillis());
- pendingBundleCount.incrementAndGet();
- bundleProgressListener.onBundleStarted();
- }
-
- currentBundleElementCount.incrementAndGet();
- }
-
- void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
- // propagate watermark immediately if no bundle is in progress and all the previous bundles have
- // completed.
- if (!isBundleStarted() && pendingBundleCount.get() == 0) {
- LOG.debug("Propagating watermark: {} directly since no bundle in progress.", watermark);
- bundleProgressListener.onWatermark(watermark, emitter);
- return;
- }
-
- // hold back the watermark since there is either a bundle in progress or previously closed
- // bundles are unfinished.
- this.bundleWatermarkHold = watermark;
-
- // for batch mode, the max watermark should force the bundle to close
- if (BoundedWindow.TIMESTAMP_MAX_VALUE.equals(watermark)) {
- /*
- * Due to lack of async watermark function, we block on the previous watermark futures before propagating the watermark
- * downstream. If a bundle is in progress tryFinishBundle() fill force the bundle to close and emit watermark.
- * If no bundle in progress, we progress watermark explicitly after the completion of previous watermark futures.
- */
- if (isBundleStarted()) {
- LOG.info(
- "Received max watermark. Triggering finish bundle before flushing the watermark downstream.");
- tryFinishBundle(emitter);
- watermarkFuture.toCompletableFuture().join();
- } else {
- LOG.info(
- "Received max watermark. Waiting for previous bundles to complete before flushing the watermark downstream.");
- watermarkFuture.toCompletableFuture().join();
- bundleProgressListener.onWatermark(watermark, emitter);
- }
- }
- }
-
- void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
- // this is internal timer in processing time to check whether a bundle should be closed
- if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
- tryFinishBundle(emitter);
- scheduleNextBundleCheck();
- }
- }
-
- /**
- * Signal the bundle manager to handle failure. We discard the output collected as part of
- * processing the current element and reset the bundle count.
- *
- * @param t failure cause
- */
- void signalFailure(Throwable t) {
- LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
- futureCollector.discard();
- // reset the bundle start flag only if the bundle has started
- isBundleStarted.compareAndSet(true, false);
+public interface BundleManager<OutT> {
+ void tryStartBundle();
- // bundle start may not necessarily mean we have actually started the bundle since some of the
- // invariant check conditions within bundle start could throw exceptions. so rely on bundle
- // start time
- if (bundleStartTime.get() != Long.MAX_VALUE) {
- currentBundleElementCount.set(0L);
- bundleStartTime.set(Long.MAX_VALUE);
- pendingBundleCount.decrementAndGet();
- currentActiveBundleDoneFutureReference.set(null);
- }
- }
-
- void tryFinishBundle(OpEmitter<OutT> emitter) {
-
- // we need to seal the output for each element within a bundle irrespective of the whether we
- // decide to finish the
- // bundle or not
- CompletionStage<Collection<WindowedValue<OutT>>> outputFuture = futureCollector.finish();
-
- if (shouldFinishBundle() && isBundleStarted.compareAndSet(true, false)) {
- LOG.debug("Finishing the current bundle.");
-
- // reset the bundle count
- // seal the bundle and emit the result future (collection of results)
- // chain the finish bundle invocation on the finish bundle
- currentBundleElementCount.set(0L);
- bundleStartTime.set(Long.MAX_VALUE);
- Instant watermarkHold = bundleWatermarkHold;
- bundleWatermarkHold = null;
-
- CompletionStage<Void> currentActiveBundleDoneFuture =
- currentActiveBundleDoneFutureReference.get();
- outputFuture =
- outputFuture.thenCombine(
- currentActiveBundleDoneFuture != null
- ? currentActiveBundleDoneFuture
- : CompletableFuture.completedFuture(null),
- (res, ignored) -> {
- bundleProgressListener.onBundleFinished(emitter);
- return res;
- });
-
- BiConsumer<Collection<WindowedValue<OutT>>, Void> watermarkPropagationFn;
- if (watermarkHold == null) {
- watermarkPropagationFn = (ignored, res) -> pendingBundleCount.decrementAndGet();
- } else {
- watermarkPropagationFn =
- (ignored, res) -> {
- LOG.debug("Propagating watermark: {} to downstream.", watermarkHold);
- bundleProgressListener.onWatermark(watermarkHold, emitter);
- pendingBundleCount.decrementAndGet();
- };
- }
+ void processWatermark(Instant watermark, OpEmitter<OutT> emitter);
- // We chain the current watermark emission with previous watermark and the output futures
- // since bundles can finish out of order but we still want the watermark to be emitted in
- // order.
- watermarkFuture = outputFuture.thenAcceptBoth(watermarkFuture, watermarkPropagationFn);
- currentActiveBundleDoneFutureReference.set(null);
- } else if (isBundleStarted.get()) {
- final CompletableFuture<Collection<WindowedValue<OutT>>> finalOutputFuture =
- outputFuture.toCompletableFuture();
- currentActiveBundleDoneFutureReference.updateAndGet(
- maybePrevFuture -> {
- CompletableFuture<Void> prevFuture =
- maybePrevFuture != null ? maybePrevFuture : CompletableFuture.completedFuture(null);
+ void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter);
- return CompletableFuture.allOf(prevFuture, finalOutputFuture);
- });
- }
+ void signalFailure(Throwable t);
- // emit the future to the propagate it to rest of the DAG
- emitter.emitFuture(outputFuture);
- }
-
- @VisibleForTesting
- long getCurrentBundleElementCount() {
- return currentBundleElementCount.longValue();
- }
-
- @VisibleForTesting
- @Nullable
- CompletionStage<Void> getCurrentBundleDoneFuture() {
- return currentActiveBundleDoneFutureReference.get();
- }
-
- @VisibleForTesting
- void setCurrentBundleDoneFuture(CompletableFuture<Void> currentBundleResultFuture) {
- this.currentActiveBundleDoneFutureReference.set(currentBundleResultFuture);
- }
-
- @VisibleForTesting
- long getPendingBundleCount() {
- return pendingBundleCount.longValue();
- }
-
- @VisibleForTesting
- void setPendingBundleCount(long value) {
- pendingBundleCount.set(value);
- }
-
- @VisibleForTesting
- boolean isBundleStarted() {
- return isBundleStarted.get();
- }
-
- @VisibleForTesting
- void setBundleWatermarkHold(Instant watermark) {
- this.bundleWatermarkHold = watermark;
- }
-
- /**
- * We close the current bundle in progress if one of the following criteria is met 1. The bundle
- * count ≥ maxBundleSize 2. Time elapsed since the bundle started is ≥ maxBundleTimeMs 3.
- * Watermark hold equals to TIMESTAMP_MAX_VALUE which usually is the case for bounded jobs
- *
- * @return true - if one of the criteria above is satisfied; false - otherwise
- */
- private boolean shouldFinishBundle() {
- return isBundleStarted.get()
- && (currentBundleElementCount.get() >= maxBundleSize
- || System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs
- || BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold));
- }
+ void tryFinishBundle(OpEmitter<OutT> emitter);
/**
* A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle
@@ -339,7 +40,7 @@ public class BundleManager<OutT> {
*
* @param <OutT>
*/
- public interface BundleProgressListener<OutT> {
+ interface BundleProgressListener<OutT> {
void onBundleStarted();
void onBundleFinished(OpEmitter<OutT> emitter);
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java
similarity index 92%
copy from runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
copy to runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java
index 873c5f3005f..4a90e468824 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/BundleManager.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/ClassicBundleManager.java
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
-public class BundleManager<OutT> {
- private static final Logger LOG = LoggerFactory.getLogger(BundleManager.class);
+public class ClassicBundleManager<OutT> implements BundleManager<OutT> {
+ private static final Logger LOG = LoggerFactory.getLogger(ClassicBundleManager.class);
private static final long MIN_BUNDLE_CHECK_TIME_MS = 10L;
private final long maxBundleSize;
@@ -87,7 +87,7 @@ public class BundleManager<OutT> {
private transient AtomicReference<CompletableFuture<Void>> currentActiveBundleDoneFutureReference;
private transient CompletionStage<Void> watermarkFuture;
- public BundleManager(
+ public ClassicBundleManager(
BundleProgressListener<OutT> bundleProgressListener,
FutureCollector<OutT> futureCollector,
long maxBundleSize,
@@ -134,7 +134,8 @@ public class BundleManager<OutT> {
new KeyedTimerData<>(new byte[0], null, timerData), nextBundleCheckTime.getMillis());
}
- void tryStartBundle() {
+ @Override
+ public void tryStartBundle() {
futureCollector.prepare();
if (isBundleStarted.compareAndSet(false, true)) {
@@ -151,7 +152,8 @@ public class BundleManager<OutT> {
currentBundleElementCount.incrementAndGet();
}
- void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
+ @Override
+ public void processWatermark(Instant watermark, OpEmitter<OutT> emitter) {
// propagate watermark immediately if no bundle is in progress and all the previous bundles have
// completed.
if (!isBundleStarted() && pendingBundleCount.get() == 0) {
@@ -185,7 +187,8 @@ public class BundleManager<OutT> {
}
}
- void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
+ @Override
+ public void processTimer(KeyedTimerData<Void> keyedTimerData, OpEmitter<OutT> emitter) {
// this is internal timer in processing time to check whether a bundle should be closed
if (bundleCheckTimerId.equals(keyedTimerData.getTimerData().getTimerId())) {
tryFinishBundle(emitter);
@@ -199,7 +202,8 @@ public class BundleManager<OutT> {
*
* @param t failure cause
*/
- void signalFailure(Throwable t) {
+ @Override
+ public void signalFailure(Throwable t) {
LOG.error("Encountered error during processing the message. Discarding the output due to: ", t);
futureCollector.discard();
// reset the bundle start flag only if the bundle has started
@@ -216,7 +220,8 @@ public class BundleManager<OutT> {
}
}
- void tryFinishBundle(OpEmitter<OutT> emitter) {
+ @Override
+ public void tryFinishBundle(OpEmitter<OutT> emitter) {
// we need to seal the output for each element within a bundle irrespective of the whether we
// decide to finish the
@@ -328,22 +333,4 @@ public class BundleManager<OutT> {
|| System.currentTimeMillis() - bundleStartTime.get() >= maxBundleTimeMs
|| BoundedWindow.TIMESTAMP_MAX_VALUE.equals(bundleWatermarkHold));
}
-
- /**
- * A listener used to track the lifecycle of a bundle. Typically, the lifecycle of a bundle
- * consists of 1. Start bundle - Invoked when the bundle is started 2. Finish bundle - Invoked
- * when the bundle is complete. Refer to the docs under {@link BundleManager} for definition on
- * when a bundle is considered complete. 3. onWatermark - Invoked when watermark is ready to be
- * propagated to downstream DAG. Refer to the docs under {@link BundleManager} on when watermark
- * is held vs propagated.
- *
- * @param <OutT>
- */
- public interface BundleProgressListener<OutT> {
- void onBundleStarted();
-
- void onBundleFinished(OpEmitter<OutT> emitter);
-
- void onWatermark(Instant watermark, OpEmitter<OutT> emitter);
- }
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 35661ae86fe..c693754b5b9 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -200,7 +200,7 @@ public class DoFnOp<InT, FnOutT, OutT> implements Op<InT, OutT, Void> {
final FutureCollector<OutT> outputFutureCollector = createFutureCollector();
this.bundleManager =
- new BundleManager<>(
+ new ClassicBundleManager<>(
createBundleProgressListener(),
outputFutureCollector,
samzaPipelineOptions.getMaxBundleSize(),
diff --git a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
similarity index 97%
rename from runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
rename to runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
index 91422097e83..f8f30a8d2f8 100644
--- a/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/BundleManagerTest.java
+++ b/runners/samza/src/test/java/org/apache/beam/runners/samza/runtime/ClassicBundleManagerTest.java
@@ -42,24 +42,24 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
-/** Unit tests for {@linkplain BundleManager}. */
-public final class BundleManagerTest {
+/** Unit tests for {@linkplain ClassicBundleManager}. */
+public final class ClassicBundleManagerTest {
private static final long MAX_BUNDLE_SIZE = 3;
private static final long MAX_BUNDLE_TIME_MS = 2000;
private static final String BUNDLE_CHECK_TIMER_ID = "bundle-check-test-timer";
private FutureCollector<String> mockFutureCollector;
- private BundleManager<String> bundleManager;
- private BundleManager.BundleProgressListener<String> bundleProgressListener;
+ private ClassicBundleManager<String> bundleManager;
+ private ClassicBundleManager.BundleProgressListener<String> bundleProgressListener;
private Scheduler<KeyedTimerData<Void>> mockScheduler;
@Before
public void setUp() {
mockFutureCollector = mock(FutureCollector.class);
- bundleProgressListener = mock(BundleManager.BundleProgressListener.class);
+ bundleProgressListener = mock(ClassicBundleManager.BundleProgressListener.class);
mockScheduler = mock(Scheduler.class);
bundleManager =
- new BundleManager<>(
+ new ClassicBundleManager<>(
bundleProgressListener,
mockFutureCollector,
MAX_BUNDLE_SIZE,
@@ -307,8 +307,8 @@ public final class BundleManagerTest {
@Test
public void testProcessTimerWithBundleTimeElapsed() {
- BundleManager<String> bundleManager =
- new BundleManager<>(
+ ClassicBundleManager<String> bundleManager =
+ new ClassicBundleManager<>(
bundleProgressListener,
mockFutureCollector,
MAX_BUNDLE_SIZE,