You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Kenneth Knowles (JIRA)" <ji...@apache.org> on 2018/01/30 05:01:00 UTC

[jira] [Comment Edited] (BEAM-3194) Support annotating that a DoFn requires stable / deterministic input for replay/retry

    [ https://issues.apache.org/jira/browse/BEAM-3194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16287091#comment-16287091 ] 

Kenneth Knowles edited comment on BEAM-3194 at 1/30/18 5:00 AM:
----------------------------------------------------------------

kennknowles closed pull request #4135: [BEAM-3194] Add @RequiresStableInput annotation
URL: https://github.com/apache/beam/pull/4135


was (Author: githubbot):
kennknowles closed pull request #4135: [BEAM-3194] Add @RequiresStableInput annotation
URL: https://github.com/apache/beam/pull/4135
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 3e023db679d..9f8dd45d110 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -567,6 +567,29 @@ public Duration getAllowedTimestampSkew() {
   @Target(ElementType.METHOD)
   public @interface ProcessElement {}
 
+  /**
+   * <b><i>Experimental - no backwards compatibility guarantees. The exact name or usage of this
+   * feature may change.</i></b>
+   *
+   * <p>Annotation that may be added to a {@link ProcessElement} or {@link OnTimer} method to
+   * indicate that the runner must ensure that the observable contents of the input {@link
+   * PCollection} or mutable state must be stable upon retries.
+   *
+   * <p>This is important for sinks, which must ensure exactly-once semantics when writing to a
+   * storage medium outside of your pipeline. A general pattern for a basic sink is to write a
+   * {@link DoFn} that can perform an idempotent write, and annotate that it requires stable input.
+   * Combined, these allow the write to be freely retried until success.
+   *
+   * <p>An example of an unstable input would be anything computed using nondeterministic logic. In
+   * Beam, any user-defined function is permitted to be nondeterministic, and any {@link
+   * PCollection} is permitted to be recomputed in any manner.
+   */
+  @Documented
+  @Experimental
+  @Retention(RetentionPolicy.RUNTIME)
+  @Target(ElementType.METHOD)
+  public @interface RequiresStableInput {}
+
   /**
    * Annotation for the method to use to finish processing a batch of elements.
    * The method annotated with this must satisfy the following constraints:
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index bfad69ea776..1e126611452 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -426,6 +426,12 @@ public static TimerParameter timerParameter(TimerDeclaration decl) {
     @Override
     public abstract List<Parameter> extraParameters();
 
+    /**
+     * Whether this method requires stable input, expressed via {@link
+     * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}.
+     */
+    public abstract boolean requiresStableInput();
+
     /** Concrete type of the {@link RestrictionTracker} parameter, if present. */
     @Nullable
     public abstract TypeDescriptor<?> trackerT();
@@ -440,12 +446,14 @@ public static TimerParameter timerParameter(TimerDeclaration decl) {
     static ProcessElementMethod create(
         Method targetMethod,
         List<Parameter> extraParameters,
+        boolean requiresStableInput,
         TypeDescriptor<?> trackerT,
         @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
         boolean hasReturnValue) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
           targetMethod,
           Collections.unmodifiableList(extraParameters),
+          requiresStableInput,
           trackerT,
           windowT,
           hasReturnValue);
@@ -487,6 +495,13 @@ public boolean isSplittable() {
     @Override
     public abstract Method targetMethod();
 
+    /**
+     * Whether this method requires stable input, expressed via {@link
+     * org.apache.beam.sdk.transforms.DoFn.RequiresStableInput}. For timers, this means that any
+     * state must be stably persisted prior to calling it.
+     */
+    public abstract boolean requiresStableInput();
+
     /** The window type used by this method, if any. */
     @Nullable
     public abstract TypeDescriptor<? extends BoundedWindow> windowT();
@@ -498,10 +513,15 @@ public boolean isSplittable() {
     static OnTimerMethod create(
         Method targetMethod,
         String id,
+        boolean requiresStableInput,
         TypeDescriptor<? extends BoundedWindow> windowT,
         List<Parameter> extraParameters) {
       return new AutoValue_DoFnSignature_OnTimerMethod(
-          id, targetMethod, windowT, Collections.unmodifiableList(extraParameters));
+          id,
+          targetMethod,
+          requiresStableInput,
+          windowT,
+          Collections.unmodifiableList(extraParameters));
     }
   }
 
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 52607833f71..98742be3cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -679,6 +679,8 @@ private static void verifyUnsplittableMethods(ErrorReporter errors, DoFnSignatur
 
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
 
+    boolean requiresStableInput = m.isAnnotationPresent(DoFn.RequiresStableInput.class);
+
     @Nullable TypeDescriptor<? extends BoundedWindow> windowT = getWindowType(fnClass, m);
 
     List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
@@ -706,7 +708,8 @@ private static void verifyUnsplittableMethods(ErrorReporter errors, DoFnSignatur
       extraParameters.add(parameter);
     }
 
-    return DoFnSignature.OnTimerMethod.create(m, timerId, windowT, extraParameters);
+    return DoFnSignature.OnTimerMethod.create(
+        m, timerId, requiresStableInput, windowT, extraParameters);
   }
 
   @VisibleForTesting
@@ -723,9 +726,10 @@ private static void verifyUnsplittableMethods(ErrorReporter errors, DoFnSignatur
         "Must return void or %s",
         DoFn.ProcessContinuation.class.getSimpleName());
 
-
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
 
+    boolean requiresStableInput = m.isAnnotationPresent(DoFn.RequiresStableInput.class);
+
     Type[] params = m.getGenericParameterTypes();
 
     TypeDescriptor<?> trackerT = getTrackerType(fnClass, m);
@@ -763,6 +767,7 @@ private static void verifyUnsplittableMethods(ErrorReporter errors, DoFnSignatur
     return DoFnSignature.ProcessElementMethod.create(
         m,
         methodContext.getExtraParameters(),
+        requiresStableInput,
         trackerT,
         windowT,
         DoFn.ProcessContinuation.class.equals(m.getReturnType()));
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index a961203ffed..d7b5cad48c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -22,6 +22,7 @@
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -77,6 +78,17 @@ public void process(ProcessContext c) {}
         sig.processElement().extraParameters().get(0), instanceOf(ProcessContextParameter.class));
   }
 
+  @Test
+  public void testRequiresStableInputProcessElement() throws Exception {
+    DoFnSignature sig = DoFnSignatures.getSignature(new DoFn<String, String>() {
+      @ProcessElement
+      @RequiresStableInput
+      public void process(ProcessContext c) {}
+    }.getClass());
+
+    assertThat(sig.processElement().requiresStableInput(), is(true));
+  }
+
   @Test
   public void testBadExtraContext() throws Exception {
     thrown.expect(IllegalArgumentException.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support annotating that a DoFn requires stable / deterministic input for replay/retry
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-3194
>                 URL: https://issues.apache.org/jira/browse/BEAM-3194
>             Project: Beam
>          Issue Type: New Feature
>          Components: beam-model
>            Reporter: Kenneth Knowles
>            Assignee: Eugene Kirpichov
>            Priority: Major
>
> See the thread: https://lists.apache.org/thread.html/5fd81ce371aeaf642665348f8e6940e308e04275dd7072f380f9f945@%3Cdev.beam.apache.org%3E
> We need this in order to have truly cross-runner end-to-end exactly once via replay + idempotence.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)