You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/11/09 17:42:30 UTC
[beam] branch master updated: [BEAM-3741] Proto changes for
reporting backlog/splitting/finalizing bundles. (#6963)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 17c2da6 [BEAM-3741] Proto changes for reporting backlog/splitting/finalizing bundles. (#6963)
17c2da6 is described below
commit 17c2da6d981cae9f233aea1e2d6d64259362dd73
Author: Lukasz Cwik <lc...@google.com>
AuthorDate: Fri Nov 9 09:42:22 2018 -0800
[BEAM-3741] Proto changes for reporting backlog/splitting/finalizing bundles. (#6963)
* [BEAM-3741] Proto changes for reporting backlog/splitting/finalizing bundles.
This change contains the recommended proto changes from:
* https://s.apache.org/beam-finalizing-bundles
* https://s.apache.org/beam-bundles-backlog-splitting
* https://s.apache.org/beam-checkpoint-and-split-bundles
---
.../fn-execution/src/main/proto/beam_fn_api.proto | 176 ++++++++++++++-------
.../pipeline/src/main/proto/beam_runner_api.proto | 3 +
.../portable/RemoteStageEvaluatorFactory.java | 2 +-
.../SplittableRemoteStageEvaluatorFactory.java | 13 +-
.../direct/portable/ReferenceRunnerTest.java | 2 +
.../functions/FlinkExecutableStageFunction.java | 2 +-
.../streaming/ExecutableStageDoFnOperator.java | 2 +-
.../fnexecution/control/BundleProgressHandler.java | 16 +-
.../splittabledofn/SDFFeederViaStateAndTimers.java | 41 +++--
.../fnexecution/control/RemoteExecutionTest.java | 12 +-
.../fnexecution/control/SdkHarnessClientTest.java | 4 +-
.../harness/SplittableProcessElementsRunner.java | 20 ++-
.../fn/harness/control/BundleSplitListener.java | 6 +-
.../fn/harness/control/ProcessBundleHandler.java | 25 ++-
14 files changed, 188 insertions(+), 136 deletions(-)
diff --git a/model/fn-execution/src/main/proto/beam_fn_api.proto b/model/fn-execution/src/main/proto/beam_fn_api.proto
index 915686d..39229b1 100644
--- a/model/fn-execution/src/main/proto/beam_fn_api.proto
+++ b/model/fn-execution/src/main/proto/beam_fn_api.proto
@@ -119,6 +119,7 @@ message InstructionRequest {
ProcessBundleRequest process_bundle = 1001;
ProcessBundleProgressRequest process_bundle_progress = 1002;
ProcessBundleSplitRequest process_bundle_split = 1003;
+ FinalizeBundleRequest finalize_bundle = 1004;
}
}
@@ -142,6 +143,7 @@ message InstructionResponse {
ProcessBundleResponse process_bundle = 1001;
ProcessBundleProgressResponse process_bundle_progress = 1002;
ProcessBundleSplitResponse process_bundle_split = 1003;
+ FinalizeBundleResponse finalize_bundle = 1004;
}
}
@@ -184,55 +186,78 @@ message ProcessBundleDescriptor {
org.apache.beam.model.pipeline.v1.ApiServiceDescriptor state_api_service_descriptor = 7;
}
-// Represents a partition of the bundle into two bundles: a "primary" and
-// a "residual", with the following properties:
-// - The work in primary and residual doesn't overlap, and combined, adds up
-// to the work in the current bundle if the split hadn't happened.
-// - The current bundle, if it keeps executing, will have done none of the
-// work under residual roots.
-// - The current bundle, if no further splits happen, will have done exactly
-// the work under primary_roots.
-// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
-message BundleSplit {
- // One of the root applications specifying the scope of work for a bundle.
- message Application {
- // (Required) The primitive transform to which to pass the element
- string ptransform_id = 1;
+// One of the applications specifying the scope of work for a bundle.
+// See https://docs.google.com/document/d/1tUDb45sStdR8u7-jBkGdw3OGFK7aa2-V7eo86zYSE_4/edit#heading=h.9g3g5weg2u9 for further details.
+message BundleApplication {
+ // (Required) The primitive transform to which to pass the element
+ string ptransform_id = 1;
+
+ // (Required) Name of the transform's input to which to pass the element.
+ string input_id = 2;
- // (Required) Name of the transform's input to which to pass the element.
- string input_id = 2;
+ // (Required) The encoded element to pass to the transform.
+ bytes element = 3;
- // (Required) The encoded element to pass to the transform.
- bytes element = 3;
+ // The map is keyed by the local output name of the PTransform. Each
+ // value represents a lower bound on the timestamps of elements that
+ // are produced by this PTransform into each of its output PCollections
+ // when invoked with this application.
+ map<string, google.protobuf.Timestamp> output_watermarks = 4;
- // Approximate lower bounds on timestamps of elements that this PTransform
- // will produce into each of its output PCollections, when invoked on this
- // element. Keyed by the transform's local output name.
- map<string, int64> output_watermarks = 4;
+ // Represents an estimate for the amount of currently outstanding work.
+ message Backlog {
+ // This informs Runners on how to aggregate the backlog
+ // being reported across multiple active bundles. Backlogs
+ // are aggregated using the set of partitions.
+ //
+ // For example SplittableDoFn's which consume elements from:
+ // * a globally shared resource such as a Pubsub queue should set this
+ // to “”.
+ // * a shared partitioned resource should use the partition identifier.
+ // * a uniquely partitioned resource such as a file range should set this to
+ // file name + start offset.
+ bytes partition = 1;
+
+ // The estimate for the backlog.
+ oneof value {
+ // Represents an estimate for the amount of outstanding work. Values
+ // compare lexicographically.
+ bytes bytes = 1000;
- // Approximate fraction of all work of the current bundle (before split)
- // represented by invoking this Application and its downstream applications.
- // The sum of fraction_of_work between all primary_roots and residual_roots
- // must add up to approximately 1.0.
- google.protobuf.DoubleValue fraction_of_work = 5;
+ // Whether the backlog is unknown.
+ bool is_unknown = 1001;
+ }
}
- // An an Application should be scheduled after a delay.
- message DelayedApplication {
- // The delay in seconds (lower bound).
- double delay_sec = 1;
+ // (Required) An estimate for the amount outstanding work related to
+ // this application.
+ Backlog backlog = 5;
- // (Required) The application that should be scheduled.
- Application application = 2;
- }
+ // (Required) Whether this application potentially produces an unbounded
+ // amount of data. Note that this should only be set to BOUNDED if and
+ // only if the application is known to produce a finite amount of output.
+ //
+ // Note that this is different from the backlog as the backlog represents
+ // how much work there is currently outstanding.
+ org.apache.beam.model.pipeline.v1.IsBounded.Enum is_bounded = 6;
- // Root applications that should replace the current bundle.
- repeated Application primary_roots = 1;
+ // Contains additional monitoring information related to this application.
+ //
+ // Each application is able to report information that some runners
+ // will use consume when providing a UI or for making scaling and performance
+ // decisions. See https://s.apache.org/beam-bundles-backlog-splitting for
+ // details about what types of signals may be useful to report.
+ repeated MonitoringInfo monitoring_infos = 7;
+}
- // Root applications that have been removed from the current bundle and
- // have to be executed in a separate bundle (e.g. in parallel on a different
- // worker, or after the current bundle completes, etc.)
- repeated DelayedApplication residual_roots = 2;
+// An Application should be scheduled for execution after a delay.
+message DelayedBundleApplication {
+ // Recommended time at which the application should be scheduled to execute
+ // by the runner. Times in the past may be scheduled to execute immediately.
+ google.protobuf.Timestamp requested_execution_time = 1;
+
+ // (Required) The application that should be scheduled.
+ BundleApplication application = 2;
}
// A request to process a given bundle.
@@ -247,20 +272,25 @@ message ProcessBundleRequest {
repeated bytes cache_tokens = 2;
}
-// Stable
message ProcessBundleResponse {
// (Optional) If metrics reporting is supported by the SDK, this represents
// the final metrics to record for this bundle.
// DEPRECATED
Metrics metrics = 1;
- // (Optional) Specifies that the bundle has been split since the last
- // ProcessBundleProgressResponse was sent.
- BundleSplit split = 2;
+ // (Optional) Specifies that the bundle has not been completed and the
+ // following applications need to be scheduled and executed in the future.
+ repeated DelayedBundleApplication residual_roots = 2;
// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated MonitoringInfo monitoring_infos = 3;
+
+ // (Optional) Specifies that the runner must callback to this worker
+ // once the output of the bundle is committed. The Runner must send a
+ // FinalizeBundleRequest with the instruction id of the ProcessBundleRequest
+ // that is related to this ProcessBundleResponse.
+ bool requires_finalization = 4;
}
// A request to report progress information for a given bundle.
@@ -310,7 +340,7 @@ message MonitoringInfo {
map<string, string> labels = 5;
// The walltime of the most recent update.
- // Useful for aggregation for Latest types such as LatestInt64.
+ // Useful for aggregation for latest types such as LatestInt64.
google.protobuf.Timestamp timestamp = 6;
}
@@ -573,36 +603,62 @@ message ProcessBundleProgressResponse {
// DEPRECATED (Required)
Metrics metrics = 1;
- // (Optional) Specifies that the bundle has been split since the last
- // ProcessBundleProgressResponse was sent.
- BundleSplit split = 2;
-
// (Required) The list of metrics or other MonitoredState
// collected while processing this bundle.
repeated MonitoringInfo monitoring_infos = 3;
+
+ // The list of currently active primary roots that are being
+ // executed. Required to be populated for PTransforms which can be split.
+ repeated BundleApplication primary_roots = 4;
}
+// Represents a request to the SDK to split a currently active bundle.
message ProcessBundleSplitRequest {
// (Required) A reference to an active process bundle request with the given
// instruction id.
string instruction_reference = 1;
- // Specifies that the runner would like the bundle to split itself using
- // BundleSplit, and give up some of the work that the bundle hasn't started
- // doing yet, so that it can be done in a separate bundle (perhaps in
- // parallel with the current bundle).
+ // (Required) Specifies that the Runner would like the bundle to split itself
+ // such that it performs no more work than the backlog specified for each
+ // PTransform. The interpretation of how much work should be processed is up
+ // to the PTransform.
//
- // The value is the fraction of unstarted work to keep. E.g. 0 means give up
- // as much as possible of unstarted work (e.g. checkpoint), 0.5 means give
- // up about half of the unstarted work, etc.
- // This is a hint and the value is approximate.
+ // For example, A backlog of "" tells the SDK to perform as little work as
+ // possible, effectively checkpointing when able. The remaining backlog
+ // will be relative to the backlog reported during processing.
//
- // The value is relative to the current scope of work of the bundle.
- google.protobuf.DoubleValue fraction_of_remainder = 2;
+ // If the backlog is unspecified for a PTransform, the runner would like
+ // the SDK to process all data received for that PTransform.
+ map<string, bytes> backlog_remaining = 2;
}
+// Represents a partition of the bundle: a "primary" and
+// a "residual", with the following properties:
+// - The work in primary and residual doesn't overlap, and combined, adds up
+// to the work in the current bundle if the split hadn't happened.
+// - The current bundle, if it keeps executing, will have done none of the
+// work under residual_roots.
+// - The current bundle, if no further splits happen, will have done exactly
+// the work under primary_roots.
+// For more rigorous definitions see https://s.apache.org/beam-breaking-fusion
message ProcessBundleSplitResponse {
- // Empty.
+ // Root applications that should replace the current bundle.
+ repeated BundleApplication primary_roots = 1;
+
+ // Root applications that have been removed from the current bundle and
+ // have to be executed in a separate bundle (e.g. in parallel on a different
+ // worker, or after the current bundle completes, etc.)
+ repeated DelayedBundleApplication residual_roots = 2;
+}
+
+message FinalizeBundleRequest {
+ // (Required) A reference to a completed process bundle request with the given
+ // instruction id.
+ string instruction_reference = 1;
+}
+
+message FinalizeBundleResponse {
+ // Empty
}
/*
diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index 4b537b0..cbf3941 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -462,6 +462,9 @@ message ParDoPayload {
// (Required if splittable == true) Id of the restriction coder.
string restriction_coder_id = 7;
+
+ // (Optional) Only set when this ParDo can request bundle finalization.
+ bool requests_finalization = 8;
}
// Parameters that a UDF might require.
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
index 7089576..fd42f2f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/RemoteStageEvaluatorFactory.java
@@ -76,7 +76,7 @@ class RemoteStageEvaluatorFactory implements TransformEvaluatorFactory {
BundleFactoryOutputReceiverFactory.create(
bundleFactory, stage.getComponents(), outputs::add),
StateRequestHandler.unsupported(),
- BundleProgressHandler.unsupported());
+ BundleProgressHandler.ignored());
// TODO(BEAM-4680): Add support for timers as inputs to the ULR
this.mainInput = Iterables.getOnlyElement(bundle.getInputReceivers().values());
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
index 88fcae1..2993d23 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/portable/SplittableRemoteStageEvaluatorFactory.java
@@ -130,20 +130,13 @@ class SplittableRemoteStageEvaluatorFactory implements TransformEvaluatorFactory
BundleFactoryOutputReceiverFactory.create(
bundleFactory, stage.getComponents(), outputs::add),
StateRequestHandler.unsupported(),
+ // TODO: Wire in splitting via a split listener
new BundleProgressHandler() {
@Override
- public void onProgress(ProcessBundleProgressResponse progress) {
- if (progress.hasSplit()) {
- feeder.split(progress.getSplit());
- }
- }
+ public void onProgress(ProcessBundleProgressResponse progress) {}
@Override
- public void onCompleted(ProcessBundleResponse response) {
- if (response.hasSplit()) {
- feeder.split(response.getSplit());
- }
- }
+ public void onCompleted(ProcessBundleResponse response) {}
});
this.mainInput = Iterables.getOnlyElement(bundle.getInputReceivers().values());
}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
index 9b5e762..fe6dd9c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/portable/ReferenceRunnerTest.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -176,6 +177,7 @@ public class ReferenceRunnerTest implements Serializable {
}
@Test
+ @Ignore("TODO: BEAM-3743")
public void testSDF() throws Exception {
Pipeline p = Pipeline.create();
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index 51acb84..17b7e53 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -125,7 +125,7 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
stateRequestHandler =
getStateRequestHandler(
executableStage, stageBundleFactory.getProcessBundleDescriptor(), runtimeContext);
- progressHandler = BundleProgressHandler.unsupported();
+ progressHandler = BundleProgressHandler.ignored();
}
private StateRequestHandler getStateRequestHandler(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 5df0845..eb64611 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -151,7 +151,7 @@ public class ExecutableStageDoFnOperator<InputT, OutputT> extends DoFnOperator<I
stageBundleFactory = stageContext.getStageBundleFactory(executableStage);
stateRequestHandler = getStateRequestHandler(executableStage);
- progressHandler = BundleProgressHandler.unsupported();
+ progressHandler = BundleProgressHandler.ignored();
outputQueue = new LinkedBlockingQueue<>();
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java
index bc7ca38..5846bdf 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProgressHandler.java
@@ -33,22 +33,14 @@ public interface BundleProgressHandler {
/** Handles the bundle's completion report. */
void onCompleted(ProcessBundleResponse response);
- /** Returns a handler that ignores metrics and throws on splits (as splits can not be ignored). */
- static BundleProgressHandler unsupported() {
+ /** Returns a handler that ignores metrics. */
+ static BundleProgressHandler ignored() {
return new BundleProgressHandler() {
@Override
- public void onProgress(ProcessBundleProgressResponse progress) {
- if (progress.hasSplit()) {
- throw new UnsupportedOperationException("Splitting not yet supported");
- }
- }
+ public void onProgress(ProcessBundleProgressResponse progress) {}
@Override
- public void onCompleted(ProcessBundleResponse response) {
- if (response.hasSplit()) {
- throw new UnsupportedOperationException("Splitting not yet supported");
- }
- }
+ public void onCompleted(ProcessBundleResponse response) {}
};
}
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
index d1d67a0..deef29a 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/splittabledofn/SDFFeederViaStateAndTimers.java
@@ -23,8 +23,8 @@ import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
import org.apache.beam.runners.core.StateNamespaces;
@@ -45,7 +45,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
-import org.joda.time.Duration;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.util.Timestamps;
import org.joda.time.Instant;
/**
@@ -76,7 +76,8 @@ public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
private WatermarkHoldState holdState;
private Instant inputTimestamp;
- private BundleSplit split;
+ private List<BundleApplication> primaryRoots;
+ private List<DelayedBundleApplication> residualRoots;
/** Initializes the feeder. */
public SDFFeederViaStateAndTimers(
@@ -119,7 +120,7 @@ public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
* and sets a wake-up timer if a checkpoint happened.
*/
public void commit() throws IOException {
- if (split == null) {
+ if (primaryRoots == null) {
// No split - the call terminated.
seedState.clear();
restrictionState.clear();
@@ -128,9 +129,8 @@ public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
}
// For now can only happen on the first instruction which is SPLITTABLE_PROCESS_ELEMENTS.
- List<DelayedApplication> residuals = split.getResidualRootsList();
- checkArgument(residuals.size() == 1, "More than 1 residual is unsupported for now");
- DelayedApplication residual = residuals.get(0);
+ checkArgument(residualRoots.size() == 1, "More than 1 residual is unsupported for now");
+ DelayedBundleApplication residual = residualRoots.get(0);
ByteString encodedResidual = residual.getApplication().getElement();
WindowedValue<KV<InputT, RestrictionT>> decodedResidual =
@@ -151,8 +151,12 @@ public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
inputTimestamp);
holdState.add(watermarkHold);
- Duration resumeDelay = Duration.millis((long) (1000L * residual.getDelaySec()));
- Instant wakeupTime = timerInternals.currentProcessingTime().plus(resumeDelay);
+ Instant requestedWakeupTime =
+ new Instant(Timestamps.toMillis(residual.getRequestedExecutionTime()));
+ Instant wakeupTime =
+ timerInternals.currentProcessingTime().isBefore(requestedWakeupTime)
+ ? requestedWakeupTime
+ : timerInternals.currentProcessingTime();
// Set a timer to continue processing this element.
timerInternals.setTimer(
@@ -160,13 +164,18 @@ public class SDFFeederViaStateAndTimers<InputT, RestrictionT> {
}
/** Signals that a split happened. */
- public void split(BundleSplit split) {
+ public void split(
+ List<BundleApplication> primaryRoots, List<DelayedBundleApplication> residualRoots) {
checkState(
- this.split == null,
- "At most 1 split supported, however got new split %s in addition to existing %s",
- split,
- this.split);
- this.split = split;
+ this.primaryRoots == null,
+ "At most 1 split supported, however got new split (%s, %s) "
+ + "in addition to existing (%s, %s)",
+ primaryRoots,
+ residualRoots,
+ this.primaryRoots,
+ this.residualRoots);
+ this.primaryRoots = primaryRoots;
+ this.residualRoots = residualRoots;
}
private void initState(StateNamespace ns) {
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index 3bbe04e..69a604f 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -263,7 +263,7 @@ public class RemoteExecutionTest implements Serializable {
// The impulse example
try (ActiveBundle bundle =
- processor.newBundle(outputReceivers, BundleProgressHandler.unsupported())) {
+ processor.newBundle(outputReceivers, BundleProgressHandler.ignored())) {
Iterables.getOnlyElement(bundle.getInputReceivers().values())
.accept(WindowedValue.valueInGlobalWindow(new byte[0]));
}
@@ -374,7 +374,7 @@ public class RemoteExecutionTest implements Serializable {
};
}
});
- BundleProgressHandler progressHandler = BundleProgressHandler.unsupported();
+ BundleProgressHandler progressHandler = BundleProgressHandler.ignored();
try (ActiveBundle bundle =
processor.newBundle(outputReceivers, stateRequestHandler, progressHandler)) {
@@ -528,7 +528,7 @@ public class RemoteExecutionTest implements Serializable {
try (ActiveBundle bundle =
processor.newBundle(
- outputReceivers, stateRequestHandler, BundleProgressHandler.unsupported())) {
+ outputReceivers, stateRequestHandler, BundleProgressHandler.ignored())) {
Iterables.getOnlyElement(bundle.getInputReceivers().values())
.accept(WindowedValue.valueInGlobalWindow(kvBytes("X", "Y")));
}
@@ -671,9 +671,7 @@ public class RemoteExecutionTest implements Serializable {
try (ActiveBundle bundle =
processor.newBundle(
- outputReceivers,
- StateRequestHandler.unsupported(),
- BundleProgressHandler.unsupported())) {
+ outputReceivers, StateRequestHandler.unsupported(), BundleProgressHandler.ignored())) {
bundle
.getInputReceivers()
.get(stage.getInputPCollection().getId())
@@ -794,7 +792,7 @@ public class RemoteExecutionTest implements Serializable {
processor.newBundle(
outputReceivers,
StateRequestHandler.unsupported(),
- BundleProgressHandler.unsupported())) {
+ BundleProgressHandler.ignored())) {
bundle
.getInputReceivers()
.get(stage.getInputPCollection().getId())
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index 82b734c..f875b78 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -236,7 +236,7 @@ public class SdkHarnessClientTest {
when(dataService.send(any(), eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
try (ActiveBundle activeBundle =
- processor.newBundle(Collections.emptyMap(), BundleProgressHandler.unsupported())) {
+ processor.newBundle(Collections.emptyMap(), BundleProgressHandler.ignored())) {
// Correlating the ProcessBundleRequest and ProcessBundleResponse is owned by the underlying
// FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping
// the response.
@@ -271,7 +271,7 @@ public class SdkHarnessClientTest {
FullWindowedValueCoder.of(
LengthPrefixCoder.of(StringUtf8Coder.of()), Coder.INSTANCE),
outputs::add)),
- BundleProgressHandler.unsupported())) {
+ BundleProgressHandler.ignored())) {
FnDataReceiver<WindowedValue<?>> bundleInputReceiver =
Iterables.getOnlyElement(activeBundle.getInputReceivers().values());
bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("foo"));
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
index 6ade4bb..92a966f 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableProcessElementsRunner.java
@@ -30,8 +30,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.beam.fn.harness.DoFnPTransformRunnerFactory.Context;
import org.apache.beam.fn.harness.state.FnApiStateAccessor;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.Application;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
import org.apache.beam.runners.core.OutputWindowedValue;
import org.apache.beam.runners.core.SplittableProcessElementInvoker;
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.protobuf.v3.com.google.protobuf.util.Timestamps;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -227,14 +228,14 @@ public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>
} catch (IOException e) {
throw new RuntimeException(e);
}
- Application primaryApplication =
- Application.newBuilder()
+ BundleApplication primaryApplication =
+ BundleApplication.newBuilder()
.setPtransformId(context.ptransformId)
.setInputId(mainInputId)
.setElement(primaryBytes.toByteString())
.build();
- Application residualApplication =
- Application.newBuilder()
+ BundleApplication residualApplication =
+ BundleApplication.newBuilder()
.setPtransformId(context.ptransformId)
.setInputId(mainInputId)
.setElement(residualBytes.toByteString())
@@ -242,9 +243,12 @@ public class SplittableProcessElementsRunner<InputT, RestrictionT, OutputT>
context.splitListener.split(
ImmutableList.of(primaryApplication),
ImmutableList.of(
- DelayedApplication.newBuilder()
+ DelayedBundleApplication.newBuilder()
.setApplication(residualApplication)
- .setDelaySec(0.001 * result.getContinuation().resumeDelay().getMillis())
+ .setRequestedExecutionTime(
+ Timestamps.fromMillis(
+ System.currentTimeMillis()
+ + result.getContinuation().resumeDelay().getMillis()))
.build()));
}
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java
index 5e6ba70..9eab245 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/BundleSplitListener.java
@@ -18,8 +18,8 @@
package org.apache.beam.fn.harness.control;
import java.util.List;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.Application;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
/**
* Listens to splits happening to a single bundle. See <a
@@ -36,5 +36,5 @@ public interface BundleSplitListener {
* are a decomposition of work that has been given away by the bundle, so the runner must delegate
* it for someone else to execute.
*/
- void split(List<Application> primaryRoots, List<DelayedApplication> residualRoots);
+ void split(List<BundleApplication> primaryRoots, List<DelayedBundleApplication> residualRoots);
}
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 9b9ed6c..547a859 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -44,9 +44,8 @@ import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.BeamFnStateGrpcClientCache;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.Application;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleSplit.DelayedApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleResponse;
@@ -236,19 +235,19 @@ public class ProcessBundleHandler {
beamFnStateGrpcClientCache.forApiServiceDescriptor(
bundleDescriptor.getStateApiServiceDescriptor()))
: new FailAllStateCallsForBundle(request.getProcessBundle())) {
- Multimap<String, Application> allPrimaries = ArrayListMultimap.create();
- Multimap<String, DelayedApplication> allResiduals = ArrayListMultimap.create();
+ Multimap<String, BundleApplication> allPrimaries = ArrayListMultimap.create();
+ Multimap<String, DelayedBundleApplication> allResiduals = ArrayListMultimap.create();
BundleSplitListener splitListener =
- (List<Application> primaries, List<DelayedApplication> residuals) -> {
+ (List<BundleApplication> primaries, List<DelayedBundleApplication> residuals) -> {
// Reset primaries and accumulate residuals.
- Multimap<String, Application> newPrimaries = ArrayListMultimap.create();
- for (Application primary : primaries) {
+ Multimap<String, BundleApplication> newPrimaries = ArrayListMultimap.create();
+ for (BundleApplication primary : primaries) {
newPrimaries.put(primary.getPtransformId(), primary);
}
allPrimaries.clear();
allPrimaries.putAll(newPrimaries);
- for (DelayedApplication residual : residuals) {
+ for (DelayedBundleApplication residual : residuals) {
allResiduals.put(residual.getApplication().getPtransformId(), residual);
}
};
@@ -290,12 +289,8 @@ public class ProcessBundleHandler {
LOG.debug("Finishing function {}", finishFunction);
finishFunction.run();
}
- if (!allPrimaries.isEmpty()) {
- response.setSplit(
- BundleSplit.newBuilder()
- .addAllPrimaryRoots(allPrimaries.values())
- .addAllResidualRoots(allResiduals.values())
- .build());
+ if (!allResiduals.isEmpty()) {
+ response.addAllResidualRoots(allResiduals.values());
}
}