You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/02 22:24:20 UTC
[beam] branch master updated: [BEAM-9397] Pass all but output
receiver parameters to start bundle/finish bundle methods.
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 360c4fe [BEAM-9397] Pass all but output receiver parameters to start bundle/finish bundle methods.
new a167255 Merge pull request #10989 from lukecwik/beam9397
360c4fe is described below
commit 360c4fe092413bbe9fb16ebfbe2d2e39fe31cb73
Author: Luke Cwik <lc...@google.com>
AuthorDate: Thu Feb 27 10:05:00 2020 -0800
[BEAM-9397] Pass all but output receiver parameters to start bundle/finish bundle methods.
The remaining work is covered by BEAM-1287.
---
.../construction/SplittableParDoNaiveBounded.java | 14 +-
.../apache/beam/runners/core/SimpleDoFnRunner.java | 308 +++------------------
.../core/SplittableParDoViaKeyedWorkItems.java | 12 +-
.../java/org/apache/beam/sdk/transforms/DoFn.java | 6 +
.../org/apache/beam/sdk/transforms/DoFnTester.java | 10 +
.../sdk/transforms/reflect/DoFnSignatures.java | 8 +-
.../sdk/transforms/reflect/DoFnSignaturesTest.java | 17 +-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 10 +
8 files changed, 113 insertions(+), 272 deletions(-)
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 30a44da..92a443a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -146,6 +146,11 @@ public class SplittableParDoNaiveBounded {
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
public String getErrorContext() {
return "SplittableParDoNaiveBounded/StartBundle";
}
@@ -200,19 +205,24 @@ public class SplittableParDoNaiveBounded {
public void output(
@Nullable OutputT output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
- "Output from FinishBundle for SDF is not supported");
+ "Output from FinishBundle for SDF is not supported in naive implementation");
}
@Override
public <T> void output(
TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
throw new UnsupportedOperationException(
- "Output from FinishBundle for SDF is not supported");
+ "Output from FinishBundle for SDF is not supported in naive implementation");
}
};
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
public String getErrorContext() {
return "SplittableParDoNaiveBounded/StartBundle";
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 71efa12..a37644a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -168,7 +168,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
public void startBundle() {
// This can contain user code. Wrap it in case it throws an exception.
try {
- invoker.invokeStartBundle(new DoFnStartBundleContext());
+ invoker.invokeStartBundle(new DoFnStartBundleArgumentProvider());
} catch (Throwable t) {
// Exception in user code.
throw wrapUserCodeException(t);
@@ -231,7 +231,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
public void finishBundle() {
// This can contain user code. Wrap it in case it throws an exception.
try {
- invoker.invokeFinishBundle(new DoFnFinishBundleContext());
+ invoker.invokeFinishBundle(new DoFnFinishBundleArgumentProvider());
} catch (Throwable t) {
// Exception in user code.
throw wrapUserCodeException(t);
@@ -258,298 +258,80 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
outputManager.output(tag, windowedElem);
}
- /** A concrete implementation of {@link DoFn.StartBundleContext}. */
- private class DoFnStartBundleContext extends DoFn<InputT, OutputT>.StartBundleContext
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
- private DoFnStartBundleContext() {
- fn.super();
- }
+ /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */
+ private class DoFnStartBundleArgumentProvider
+ extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
+ /** A concrete implementation of {@link DoFn.StartBundleContext}. */
+ private class Context extends DoFn<InputT, OutputT>.StartBundleContext {
+ private Context() {
+ fn.super();
+ }
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
}
- @Override
- public BoundedWindow window() {
- throw new UnsupportedOperationException(
- "Cannot access window outside of @ProcessElement and @OnTimer methods.");
- }
-
- @Override
- public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access paneInfo outside of @ProcessElement methods.");
- }
+ private final Context context = new Context();
@Override
public PipelineOptions pipelineOptions() {
- return getPipelineOptions();
+ return options;
}
@Override
public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
- return this;
- }
-
- @Override
- public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
- DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access FinishBundleContext outside of @FinishBundle method.");
- }
-
- @Override
- public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access ProcessContext outside of @ProcessElement method.");
- }
-
- @Override
- public InputT element(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Element parameters are not supported outside of @ProcessElement method.");
- }
-
- @Override
- public InputT sideInput(String tagId) {
- throw new UnsupportedOperationException(
- "SideInput parameters are not supported outside of @ProcessElement method.");
- }
-
- @Override
- public Object schemaElement(int index) {
- throw new UnsupportedOperationException(
- "Element parameters are not supported outside of @ProcessElement method.");
- }
-
- @Override
- public Instant timestamp(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access timestamp outside of @ProcessElement method.");
- }
-
- @Override
- public String timerId(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException("Cannot access timerId outside of @OnTimer method.");
- }
-
- @Override
- public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access time domain outside of @ProcessTimer method.");
+ return context;
}
@Override
- public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access output receiver outside of @ProcessElement method.");
- }
-
- @Override
- public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access output receiver outside of @ProcessElement method.");
- }
-
- @Override
- public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access output receiver outside of @ProcessElement method.");
- }
-
- @Override
- public Object restriction() {
- throw new UnsupportedOperationException("@Restriction parameters are not supported.");
- }
-
- @Override
- public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access OnTimerContext outside of @OnTimer methods.");
- }
-
- @Override
- public RestrictionTracker<?, ?> restrictionTracker() {
- throw new UnsupportedOperationException(
- "Cannot access RestrictionTracker outside of @ProcessElement method.");
- }
-
- @Override
- public State state(String stateId, boolean alwaysFetched) {
- throw new UnsupportedOperationException(
- "Cannot access state outside of @ProcessElement and @OnTimer methods.");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException(
- "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
- }
-
- @Override
- public TimerMap timerFamily(String tagId) {
- throw new UnsupportedOperationException(
- "Cannot access timer family outside of @ProcessElement and @OnTimer methods");
- }
-
- @Override
- public BundleFinalizer bundleFinalizer() {
- throw new UnsupportedOperationException(
- "Bundle finalization is not supported in non-portable pipelines.");
+ public String getErrorContext() {
+ return "SimpleDoFnRunner/StartBundle";
}
}
- /** B A concrete implementation of {@link DoFn.FinishBundleContext}. */
- private class DoFnFinishBundleContext extends DoFn<InputT, OutputT>.FinishBundleContext
- implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
- private DoFnFinishBundleContext() {
- fn.super();
- }
+ /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */
+ private class DoFnFinishBundleArgumentProvider
+ extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
+ /** A concrete implementation of {@link DoFn.FinishBundleContext}. */
+ private class Context extends DoFn<InputT, OutputT>.FinishBundleContext {
+ private Context() {
+ fn.super();
+ }
- @Override
- public PipelineOptions getPipelineOptions() {
- return options;
- }
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return options;
+ }
- @Override
- public BoundedWindow window() {
- throw new UnsupportedOperationException(
- "Cannot access window outside of @ProcessElement and @OnTimer methods.");
- }
+ @Override
+ public void output(OutputT output, Instant timestamp, BoundedWindow window) {
+ output(mainOutputTag, output, timestamp, window);
+ }
- @Override
- public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access paneInfo outside of @ProcessElement methods.");
+ @Override
+ public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
+ outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+ }
}
- @Override
- public PipelineOptions pipelineOptions() {
- return getPipelineOptions();
- }
+ private final Context context = new Context();
@Override
- public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access StartBundleContext outside of @StartBundle method.");
+ public PipelineOptions pipelineOptions() {
+ return options;
}
@Override
public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
DoFn<InputT, OutputT> doFn) {
- return this;
+ return context;
}
@Override
- public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access ProcessContext outside of @ProcessElement method.");
- }
-
- @Override
- public InputT element(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access element outside of @ProcessElement method.");
- }
-
- @Override
- public InputT sideInput(String tagId) {
- throw new UnsupportedOperationException(
- "Cannot access sideInput outside of @ProcessElement method.");
- }
-
- @Override
- public Object schemaElement(int index) {
- throw new UnsupportedOperationException(
- "Cannot access element outside of @ProcessElement method.");
- }
-
- @Override
- public Instant timestamp(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access timestamp outside of @ProcessElement method.");
- }
-
- @Override
- public String timerId(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access timerId as parameter outside of @OnTimer method.");
- }
-
- @Override
- public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access time domain outside of @ProcessTimer method.");
- }
-
- @Override
- public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access outputReceiver in @FinishBundle method.");
- }
-
- @Override
- public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access outputReceiver in @FinishBundle method.");
- }
-
- @Override
- public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access outputReceiver in @FinishBundle method.");
- }
-
- @Override
- public Object restriction() {
- throw new UnsupportedOperationException("@Restriction parameters are not supported.");
- }
-
- @Override
- public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
- throw new UnsupportedOperationException(
- "Cannot access OnTimerContext outside of @OnTimer methods.");
- }
-
- @Override
- public RestrictionTracker<?, ?> restrictionTracker() {
- throw new UnsupportedOperationException(
- "Cannot access RestrictionTracker outside of @ProcessElement method.");
- }
-
- @Override
- public State state(String stateId, boolean alwaysFetched) {
- throw new UnsupportedOperationException(
- "Cannot access state outside of @ProcessElement and @OnTimer methods.");
- }
-
- @Override
- public Timer timer(String timerId) {
- throw new UnsupportedOperationException(
- "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
- }
-
- @Override
- public TimerMap timerFamily(String tagId) {
- throw new UnsupportedOperationException(
- "Cannot access timerFamily outside of @ProcessElement and @OnTimer methods.");
- }
-
- @Override
- public void output(OutputT output, Instant timestamp, BoundedWindow window) {
- output(mainOutputTag, output, timestamp, window);
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
- outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
- }
-
- @Override
- public BundleFinalizer bundleFinalizer() {
- throw new UnsupportedOperationException(
- "Bundle finalization is not supported in non-portable pipelines.");
+ public String getErrorContext() {
+ return "SimpleDoFnRunner/FinishBundle";
}
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index dc795a55..28277f9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -432,6 +432,11 @@ public class SplittableParDoViaKeyedWorkItems {
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return baseContext.getPipelineOptions();
+ }
+
+ @Override
public String getErrorContext() {
return "SplittableParDoViaKeyedWorkItems/StartBundle";
}
@@ -464,13 +469,18 @@ public class SplittableParDoViaKeyedWorkItems {
private void throwUnsupportedOutput() {
throw new UnsupportedOperationException(
String.format(
- "Splittable DoFn can only output from @%s",
+ "KWI Splittable DoFn can only output from @%s",
ProcessElement.class.getSimpleName()));
}
};
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return baseContext.getPipelineOptions();
+ }
+
+ @Override
public String getErrorContext() {
return "SplittableParDoViaKeyedWorkItems/FinishBundle";
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9500599..d00950d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -592,6 +592,8 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* <ul>
* <li>If one of the parameters is of type {@link DoFn.StartBundleContext}, then it will be
* passed a context object for the current execution.
+ * <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
+ * options for the current pipeline.
* <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a
* mechanism to register a callback that will be invoked after the runner successfully
* commits the output of this bundle. See <a
@@ -810,11 +812,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* <ul>
* <li>If one of the parameters is of type {@link DoFn.FinishBundleContext}, then it will be
* passed a context object for the current execution.
+ * <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
+ * options for the current pipeline.
* <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a
* mechanism to register a callback that will be invoked after the runner successfully
* commits the output of this bundle. See <a
* href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to
* Finalize Bundles</a> for further details.
+ * <li>TODO(BEAM-1287): Add support for an {@link OutputReceiver} and {@link
+ * MultiOutputReceiver} that can output to a window.
* </ul>
*
* <p>Note that {@link FinishBundle @FinishBundle} is invoked before the runner commits the output
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index ba6bc17..112046a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -482,6 +482,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return options;
+ }
+
+ @Override
public String getErrorContext() {
return "DoFnTester/StartBundle";
}
@@ -510,6 +515,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return options;
+ }
+
+ @Override
public String getErrorContext() {
return "DoFnTester/FinishBundle";
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index d34baba..fb76330 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -132,11 +132,15 @@ public class DoFnSignatures {
private static final ImmutableList<Class<? extends Parameter>> ALLOWED_START_BUNDLE_PARAMETERS =
ImmutableList.of(
- Parameter.StartBundleContextParameter.class, Parameter.BundleFinalizerParameter.class);
+ Parameter.PipelineOptionsParameter.class,
+ Parameter.StartBundleContextParameter.class,
+ Parameter.BundleFinalizerParameter.class);
private static final ImmutableList<Class<? extends Parameter>> ALLOWED_FINISH_BUNDLE_PARAMETERS =
ImmutableList.of(
- Parameter.FinishBundleContextParameter.class, Parameter.BundleFinalizerParameter.class);
+ Parameter.PipelineOptionsParameter.class,
+ Parameter.FinishBundleContextParameter.class,
+ Parameter.BundleFinalizerParameter.class);
private static final ImmutableList<Class<? extends Parameter>> ALLOWED_ON_TIMER_PARAMETERS =
ImmutableList.of(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 24e581b..1414371 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.testing.SerializableMatchers;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter;
@@ -362,13 +363,17 @@ public class DoFnSignaturesTest {
@StartBundle
public void startBundle(
- StartBundleContext context, BundleFinalizer bundleFinalizer) {}
+ StartBundleContext context,
+ BundleFinalizer bundleFinalizer,
+ PipelineOptions options) {}
}.getClass());
- assertThat(sig.startBundle().extraParameters().size(), equalTo(2));
+ assertThat(sig.startBundle().extraParameters().size(), equalTo(3));
assertThat(
sig.startBundle().extraParameters().get(0), instanceOf(StartBundleContextParameter.class));
assertThat(
sig.startBundle().extraParameters().get(1), instanceOf(BundleFinalizerParameter.class));
+ assertThat(
+ sig.startBundle().extraParameters().get(2), instanceOf(PipelineOptionsParameter.class));
}
@Test
@@ -397,14 +402,18 @@ public class DoFnSignaturesTest {
@FinishBundle
public void finishBundle(
- FinishBundleContext context, BundleFinalizer bundleFinalizer) {}
+ FinishBundleContext context,
+ BundleFinalizer bundleFinalizer,
+ PipelineOptions pipelineOptions) {}
}.getClass());
- assertThat(sig.finishBundle().extraParameters().size(), equalTo(2));
+ assertThat(sig.finishBundle().extraParameters().size(), equalTo(3));
assertThat(
sig.finishBundle().extraParameters().get(0),
instanceOf(FinishBundleContextParameter.class));
assertThat(
sig.finishBundle().extraParameters().get(1), instanceOf(BundleFinalizerParameter.class));
+ assertThat(
+ sig.finishBundle().extraParameters().get(2), instanceOf(PipelineOptionsParameter.class));
}
@Test
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index d0f70d5..224abf1 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -945,6 +945,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> {
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
public BundleFinalizer bundleFinalizer() {
return bundleFinalizer;
}
@@ -992,6 +997,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> {
}
@Override
+ public PipelineOptions pipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
public BundleFinalizer bundleFinalizer() {
return bundleFinalizer;
}