You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/03/02 22:24:20 UTC

[beam] branch master updated: [BEAM-9397] Pass all but output receiver parameters to start bundle/finish bundle methods.

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 360c4fe  [BEAM-9397] Pass all but output receiver parameters to start bundle/finish bundle methods.
     new a167255  Merge pull request #10989 from lukecwik/beam9397
360c4fe is described below

commit 360c4fe092413bbe9fb16ebfbe2d2e39fe31cb73
Author: Luke Cwik <lc...@google.com>
AuthorDate: Thu Feb 27 10:05:00 2020 -0800

    [BEAM-9397] Pass all but output receiver parameters to start bundle/finish bundle methods.
    
    The remaining work is covered by BEAM-1287.
---
 .../construction/SplittableParDoNaiveBounded.java  |  14 +-
 .../apache/beam/runners/core/SimpleDoFnRunner.java | 308 +++------------------
 .../core/SplittableParDoViaKeyedWorkItems.java     |  12 +-
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |   6 +
 .../org/apache/beam/sdk/transforms/DoFnTester.java |  10 +
 .../sdk/transforms/reflect/DoFnSignatures.java     |   8 +-
 .../sdk/transforms/reflect/DoFnSignaturesTest.java |  17 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  10 +
 8 files changed, 113 insertions(+), 272 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 30a44da..92a443a 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -146,6 +146,11 @@ public class SplittableParDoNaiveBounded {
             }
 
             @Override
+            public PipelineOptions pipelineOptions() {
+              return c.getPipelineOptions();
+            }
+
+            @Override
             public String getErrorContext() {
               return "SplittableParDoNaiveBounded/StartBundle";
             }
@@ -200,19 +205,24 @@ public class SplittableParDoNaiveBounded {
                 public void output(
                     @Nullable OutputT output, Instant timestamp, BoundedWindow window) {
                   throw new UnsupportedOperationException(
-                      "Output from FinishBundle for SDF is not supported");
+                      "Output from FinishBundle for SDF is not supported in naive implementation");
                 }
 
                 @Override
                 public <T> void output(
                     TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
                   throw new UnsupportedOperationException(
-                      "Output from FinishBundle for SDF is not supported");
+                      "Output from FinishBundle for SDF is not supported in naive implementation");
                 }
               };
             }
 
             @Override
+            public PipelineOptions pipelineOptions() {
+              return c.getPipelineOptions();
+            }
+
+            @Override
             public String getErrorContext() {
               return "SplittableParDoNaiveBounded/StartBundle";
             }
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 71efa12..a37644a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -168,7 +168,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
   public void startBundle() {
     // This can contain user code. Wrap it in case it throws an exception.
     try {
-      invoker.invokeStartBundle(new DoFnStartBundleContext());
+      invoker.invokeStartBundle(new DoFnStartBundleArgumentProvider());
     } catch (Throwable t) {
       // Exception in user code.
       throw wrapUserCodeException(t);
@@ -231,7 +231,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
   public void finishBundle() {
     // This can contain user code. Wrap it in case it throws an exception.
     try {
-      invoker.invokeFinishBundle(new DoFnFinishBundleContext());
+      invoker.invokeFinishBundle(new DoFnFinishBundleArgumentProvider());
     } catch (Throwable t) {
       // Exception in user code.
       throw wrapUserCodeException(t);
@@ -258,298 +258,80 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     outputManager.output(tag, windowedElem);
   }
 
-  /** A concrete implementation of {@link DoFn.StartBundleContext}. */
-  private class DoFnStartBundleContext extends DoFn<InputT, OutputT>.StartBundleContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-    private DoFnStartBundleContext() {
-      fn.super();
-    }
+  /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */
+  private class DoFnStartBundleArgumentProvider
+      extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
+    /** A concrete implementation of {@link DoFn.StartBundleContext}. */
+    private class Context extends DoFn<InputT, OutputT>.StartBundleContext {
+      private Context() {
+        fn.super();
+      }
 
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return options;
+      }
     }
 
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "Cannot access window outside of @ProcessElement and @OnTimer methods.");
-    }
-
-    @Override
-    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access paneInfo outside of @ProcessElement methods.");
-    }
+    private final Context context = new Context();
 
     @Override
     public PipelineOptions pipelineOptions() {
-      return getPipelineOptions();
+      return options;
     }
 
     @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
-      return this;
-    }
-
-    @Override
-    public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
-        DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access FinishBundleContext outside of @FinishBundle method.");
-    }
-
-    @Override
-    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access ProcessContext outside of @ProcessElement method.");
-    }
-
-    @Override
-    public InputT element(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Element parameters are not supported outside of @ProcessElement method.");
-    }
-
-    @Override
-    public InputT sideInput(String tagId) {
-      throw new UnsupportedOperationException(
-          "SideInput parameters are not supported outside of @ProcessElement method.");
-    }
-
-    @Override
-    public Object schemaElement(int index) {
-      throw new UnsupportedOperationException(
-          "Element parameters are not supported outside of @ProcessElement method.");
-    }
-
-    @Override
-    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access timestamp outside of @ProcessElement method.");
-    }
-
-    @Override
-    public String timerId(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException("Cannot access timerId outside of @OnTimer method.");
-    }
-
-    @Override
-    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access time domain outside of @ProcessTimer method.");
+      return context;
     }
 
     @Override
-    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access output receiver outside of @ProcessElement method.");
-    }
-
-    @Override
-    public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access output receiver outside of @ProcessElement method.");
-    }
-
-    @Override
-    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access output receiver outside of @ProcessElement method.");
-    }
-
-    @Override
-    public Object restriction() {
-      throw new UnsupportedOperationException("@Restriction parameters are not supported.");
-    }
-
-    @Override
-    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access OnTimerContext outside of @OnTimer methods.");
-    }
-
-    @Override
-    public RestrictionTracker<?, ?> restrictionTracker() {
-      throw new UnsupportedOperationException(
-          "Cannot access RestrictionTracker outside of @ProcessElement method.");
-    }
-
-    @Override
-    public State state(String stateId, boolean alwaysFetched) {
-      throw new UnsupportedOperationException(
-          "Cannot access state outside of @ProcessElement and @OnTimer methods.");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException(
-          "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
-    }
-
-    @Override
-    public TimerMap timerFamily(String tagId) {
-      throw new UnsupportedOperationException(
-          "Cannot access timer family outside of @ProcessElement and @OnTimer methods");
-    }
-
-    @Override
-    public BundleFinalizer bundleFinalizer() {
-      throw new UnsupportedOperationException(
-          "Bundle finalization is not supported in non-portable pipelines.");
+    public String getErrorContext() {
+      return "SimpleDoFnRunner/StartBundle";
     }
   }
 
-  /** B A concrete implementation of {@link DoFn.FinishBundleContext}. */
-  private class DoFnFinishBundleContext extends DoFn<InputT, OutputT>.FinishBundleContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
-    private DoFnFinishBundleContext() {
-      fn.super();
-    }
+  /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */
+  private class DoFnFinishBundleArgumentProvider
+      extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
+    /** A concrete implementation of {@link DoFn.FinishBundleContext}. */
+    private class Context extends DoFn<InputT, OutputT>.FinishBundleContext {
+      private Context() {
+        fn.super();
+      }
 
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return options;
-    }
+      @Override
+      public PipelineOptions getPipelineOptions() {
+        return options;
+      }
 
-    @Override
-    public BoundedWindow window() {
-      throw new UnsupportedOperationException(
-          "Cannot access window outside of @ProcessElement and @OnTimer methods.");
-    }
+      @Override
+      public void output(OutputT output, Instant timestamp, BoundedWindow window) {
+        output(mainOutputTag, output, timestamp, window);
+      }
 
-    @Override
-    public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access paneInfo outside of @ProcessElement methods.");
+      @Override
+      public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
+        outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
+      }
     }
 
-    @Override
-    public PipelineOptions pipelineOptions() {
-      return getPipelineOptions();
-    }
+    private final Context context = new Context();
 
     @Override
-    public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access StartBundleContext outside of @StartBundle method.");
+    public PipelineOptions pipelineOptions() {
+      return options;
     }
 
     @Override
     public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext(
         DoFn<InputT, OutputT> doFn) {
-      return this;
+      return context;
     }
 
     @Override
-    public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access ProcessContext outside of @ProcessElement method.");
-    }
-
-    @Override
-    public InputT element(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access element outside of @ProcessElement method.");
-    }
-
-    @Override
-    public InputT sideInput(String tagId) {
-      throw new UnsupportedOperationException(
-          "Cannot access sideInput outside of @ProcessElement method.");
-    }
-
-    @Override
-    public Object schemaElement(int index) {
-      throw new UnsupportedOperationException(
-          "Cannot access element outside of @ProcessElement method.");
-    }
-
-    @Override
-    public Instant timestamp(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access timestamp outside of @ProcessElement method.");
-    }
-
-    @Override
-    public String timerId(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access timerId as parameter outside of @OnTimer method.");
-    }
-
-    @Override
-    public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access time domain outside of @ProcessTimer method.");
-    }
-
-    @Override
-    public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access outputReceiver in @FinishBundle method.");
-    }
-
-    @Override
-    public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access outputReceiver in @FinishBundle method.");
-    }
-
-    @Override
-    public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access outputReceiver in @FinishBundle method.");
-    }
-
-    @Override
-    public Object restriction() {
-      throw new UnsupportedOperationException("@Restriction parameters are not supported.");
-    }
-
-    @Override
-    public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) {
-      throw new UnsupportedOperationException(
-          "Cannot access OnTimerContext outside of @OnTimer methods.");
-    }
-
-    @Override
-    public RestrictionTracker<?, ?> restrictionTracker() {
-      throw new UnsupportedOperationException(
-          "Cannot access RestrictionTracker outside of @ProcessElement method.");
-    }
-
-    @Override
-    public State state(String stateId, boolean alwaysFetched) {
-      throw new UnsupportedOperationException(
-          "Cannot access state outside of @ProcessElement and @OnTimer methods.");
-    }
-
-    @Override
-    public Timer timer(String timerId) {
-      throw new UnsupportedOperationException(
-          "Cannot access timers outside of @ProcessElement and @OnTimer methods.");
-    }
-
-    @Override
-    public TimerMap timerFamily(String tagId) {
-      throw new UnsupportedOperationException(
-          "Cannot access timerFamily outside of @ProcessElement and @OnTimer methods.");
-    }
-
-    @Override
-    public void output(OutputT output, Instant timestamp, BoundedWindow window) {
-      output(mainOutputTag, output, timestamp, window);
-    }
-
-    @Override
-    public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) {
-      outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING));
-    }
-
-    @Override
-    public BundleFinalizer bundleFinalizer() {
-      throw new UnsupportedOperationException(
-          "Bundle finalization is not supported in non-portable pipelines.");
+    public String getErrorContext() {
+      return "SimpleDoFnRunner/FinishBundle";
     }
   }
 
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index dc795a55..28277f9 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -432,6 +432,11 @@ public class SplittableParDoViaKeyedWorkItems {
         }
 
         @Override
+        public PipelineOptions pipelineOptions() {
+          return baseContext.getPipelineOptions();
+        }
+
+        @Override
         public String getErrorContext() {
           return "SplittableParDoViaKeyedWorkItems/StartBundle";
         }
@@ -464,13 +469,18 @@ public class SplittableParDoViaKeyedWorkItems {
             private void throwUnsupportedOutput() {
               throw new UnsupportedOperationException(
                   String.format(
-                      "Splittable DoFn can only output from @%s",
+                      "KWI Splittable DoFn can only output from @%s",
                       ProcessElement.class.getSimpleName()));
             }
           };
         }
 
         @Override
+        public PipelineOptions pipelineOptions() {
+          return baseContext.getPipelineOptions();
+        }
+
+        @Override
         public String getErrorContext() {
           return "SplittableParDoViaKeyedWorkItems/FinishBundle";
         }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 9500599..d00950d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -592,6 +592,8 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * <ul>
    *   <li>If one of the parameters is of type {@link DoFn.StartBundleContext}, then it will be
    *       passed a context object for the current execution.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
+   *       options for the current pipeline.
    *   <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a
    *       mechanism to register a callback that will be invoked after the runner successfully
    *       commits the output of this bundle. See <a
@@ -810,11 +812,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * <ul>
    *   <li>If one of the parameters is of type {@link DoFn.FinishBundleContext}, then it will be
    *       passed a context object for the current execution.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the
+   *       options for the current pipeline.
    *   <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a
    *       mechanism to register a callback that will be invoked after the runner successfully
    *       commits the output of this bundle. See <a
    *       href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to
    *       Finalize Bundles</a> for further details.
+   *   <li>TODO(BEAM-1287): Add support for an {@link OutputReceiver} and {@link
+   *       MultiOutputReceiver} that can output to a window.
    * </ul>
    *
    * <p>Note that {@link FinishBundle @FinishBundle} is invoked before the runner commits the output
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index ba6bc17..112046a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -482,6 +482,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return options;
+    }
+
+    @Override
     public String getErrorContext() {
       return "DoFnTester/StartBundle";
     }
@@ -510,6 +515,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return options;
+    }
+
+    @Override
     public String getErrorContext() {
       return "DoFnTester/FinishBundle";
     }
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index d34baba..fb76330 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -132,11 +132,15 @@ public class DoFnSignatures {
 
   private static final ImmutableList<Class<? extends Parameter>> ALLOWED_START_BUNDLE_PARAMETERS =
       ImmutableList.of(
-          Parameter.StartBundleContextParameter.class, Parameter.BundleFinalizerParameter.class);
+          Parameter.PipelineOptionsParameter.class,
+          Parameter.StartBundleContextParameter.class,
+          Parameter.BundleFinalizerParameter.class);
 
   private static final ImmutableList<Class<? extends Parameter>> ALLOWED_FINISH_BUNDLE_PARAMETERS =
       ImmutableList.of(
-          Parameter.FinishBundleContextParameter.class, Parameter.BundleFinalizerParameter.class);
+          Parameter.PipelineOptionsParameter.class,
+          Parameter.FinishBundleContextParameter.class,
+          Parameter.BundleFinalizerParameter.class);
 
   private static final ImmutableList<Class<? extends Parameter>> ALLOWED_ON_TIMER_PARAMETERS =
       ImmutableList.of(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 24e581b..1414371 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -53,6 +53,7 @@ import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.state.WatermarkHoldState;
 import org.apache.beam.sdk.testing.SerializableMatchers;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter;
@@ -362,13 +363,17 @@ public class DoFnSignaturesTest {
 
               @StartBundle
               public void startBundle(
-                  StartBundleContext context, BundleFinalizer bundleFinalizer) {}
+                  StartBundleContext context,
+                  BundleFinalizer bundleFinalizer,
+                  PipelineOptions options) {}
             }.getClass());
-    assertThat(sig.startBundle().extraParameters().size(), equalTo(2));
+    assertThat(sig.startBundle().extraParameters().size(), equalTo(3));
     assertThat(
         sig.startBundle().extraParameters().get(0), instanceOf(StartBundleContextParameter.class));
     assertThat(
         sig.startBundle().extraParameters().get(1), instanceOf(BundleFinalizerParameter.class));
+    assertThat(
+        sig.startBundle().extraParameters().get(2), instanceOf(PipelineOptionsParameter.class));
   }
 
   @Test
@@ -397,14 +402,18 @@ public class DoFnSignaturesTest {
 
               @FinishBundle
               public void finishBundle(
-                  FinishBundleContext context, BundleFinalizer bundleFinalizer) {}
+                  FinishBundleContext context,
+                  BundleFinalizer bundleFinalizer,
+                  PipelineOptions pipelineOptions) {}
             }.getClass());
-    assertThat(sig.finishBundle().extraParameters().size(), equalTo(2));
+    assertThat(sig.finishBundle().extraParameters().size(), equalTo(3));
     assertThat(
         sig.finishBundle().extraParameters().get(0),
         instanceOf(FinishBundleContextParameter.class));
     assertThat(
         sig.finishBundle().extraParameters().get(1), instanceOf(BundleFinalizerParameter.class));
+    assertThat(
+        sig.finishBundle().extraParameters().get(2), instanceOf(PipelineOptionsParameter.class));
   }
 
   @Test
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index d0f70d5..224abf1 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -945,6 +945,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> {
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
     public BundleFinalizer bundleFinalizer() {
       return bundleFinalizer;
     }
@@ -992,6 +997,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> {
     }
 
     @Override
+    public PipelineOptions pipelineOptions() {
+      return pipelineOptions;
+    }
+
+    @Override
     public BundleFinalizer bundleFinalizer() {
       return bundleFinalizer;
     }