You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/07 23:06:19 UTC

[1/3] beam git commit: [BEAM-65] Adds HasDefaultTracker for RestrictionTracker inference

Repository: beam
Updated Branches:
  refs/heads/master 4fd8dc401 -> 4a694cebb


[BEAM-65] Adds HasDefaultTracker for RestrictionTracker inference

Allows a restriction type to implement HasDefaultTracker,
in that case the splittable DoFn itself does not need to
implement NewTracker - only ProcessElement and GetInitialRestriction.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03a99b4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03a99b4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03a99b4d

Branch: refs/heads/master
Commit: 03a99b4df18ceeb7cd61953c461c4cca377a37ae
Parents: 8179d8c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 12:09:47 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 15:55:27 2017 -0700

----------------------------------------------------------------------
 ...ndedSplittableProcessElementInvokerTest.java |  5 --
 .../beam/runners/core/SplittableParDoTest.java  | 53 ++++++--------
 .../reflect/ByteBuddyDoFnInvokerFactory.java    | 24 ++++++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 74 +++++++++++++-------
 .../splittabledofn/HasDefaultTracker.java       | 30 ++++++++
 .../transforms/splittabledofn/OffsetRange.java  |  8 ++-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 20 ------
 .../transforms/reflect/DoFnInvokersTest.java    | 54 +++++++++++---
 .../DoFnSignaturesSplittableDoFnTest.java       | 43 ++++++++++++
 9 files changed, 215 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index d7c9889..b85f481 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -71,11 +71,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
     public OffsetRange getInitialRestriction(Integer element) {
       throw new UnsupportedOperationException("Should not be called in this test");
     }
-
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      throw new UnsupportedOperationException("Should not be called in this test");
-    }
   }
 
   private SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index ee94ee0..6205777 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -72,10 +73,20 @@ public class SplittableParDoTest {
   private static final Duration MAX_BUNDLE_DURATION = Duration.standardSeconds(5);
 
   // ----------------- Tests for whether the transform sets boundedness correctly --------------
-  private static class SomeRestriction implements Serializable {}
+  private static class SomeRestriction
+      implements Serializable, HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {
+    @Override
+    public SomeRestrictionTracker newTracker() {
+      return new SomeRestrictionTracker(this);
+    }
+  }
 
   private static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> {
-    private final SomeRestriction someRestriction = new SomeRestriction();
+    private final SomeRestriction someRestriction;
+
+    public SomeRestrictionTracker(SomeRestriction someRestriction) {
+      this.someRestriction = someRestriction;
+    }
 
     @Override
     public SomeRestriction currentRestriction() {
@@ -96,11 +107,6 @@ public class SplittableParDoTest {
     public SomeRestriction getInitialRestriction(Integer element) {
       return null;
     }
-
-    @NewTracker
-    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-      return null;
-    }
   }
 
   private static class UnboundedFakeFn extends DoFn<Integer, String> {
@@ -114,11 +120,6 @@ public class SplittableParDoTest {
     public SomeRestriction getInitialRestriction(Integer element) {
       return null;
     }
-
-    @NewTracker
-    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-      return null;
-    }
   }
 
   private static PCollection<Integer> makeUnboundedCollection(Pipeline pipeline) {
@@ -376,11 +377,6 @@ public class SplittableParDoTest {
     public SomeRestriction getInitialRestriction(Integer elem) {
       return new SomeRestriction();
     }
-
-    @NewTracker
-    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-      return new SomeRestrictionTracker();
-    }
   }
 
   @Test
@@ -438,11 +434,6 @@ public class SplittableParDoTest {
     public SomeRestriction getInitialRestriction(Integer elem) {
       return new SomeRestriction();
     }
-
-    @NewTracker
-    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-      return new SomeRestrictionTracker();
-    }
   }
 
   @Test
@@ -474,12 +465,18 @@ public class SplittableParDoTest {
     assertThat(tester.takeOutputElements(), contains("42"));
   }
 
-  private static class SomeCheckpoint implements Serializable {
+  private static class SomeCheckpoint
+      implements Serializable, HasDefaultTracker<SomeCheckpoint, SomeCheckpointTracker> {
     private int firstUnprocessedIndex;
 
     private SomeCheckpoint(int firstUnprocessedIndex) {
       this.firstUnprocessedIndex = firstUnprocessedIndex;
     }
+
+    @Override
+    public SomeCheckpointTracker newTracker() {
+      return new SomeCheckpointTracker(this);
+    }
   }
 
   private static class SomeCheckpointTracker implements RestrictionTracker<SomeCheckpoint> {
@@ -543,11 +540,6 @@ public class SplittableParDoTest {
     public SomeCheckpoint getInitialRestriction(Integer elem) {
       throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
     }
-
-    @NewTracker
-    public SomeCheckpointTracker newTracker(SomeCheckpoint restriction) {
-      return new SomeCheckpointTracker(restriction);
-    }
   }
 
   @Test
@@ -658,11 +650,6 @@ public class SplittableParDoTest {
       return new SomeRestriction();
     }
 
-    @NewTracker
-    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
-      return new SomeRestrictionTracker();
-    }
-
     @Setup
     public void setup() {
       assertEquals(State.BEFORE_SETUP, state);

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 0155297..6746d3a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -74,6 +74,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Restrictio
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -257,6 +259,15 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
     }
   }
 
+  /** Default implementation of {@link DoFn.NewTracker}, for delegation by bytebuddy. */
+  public static class DefaultNewTracker {
+    /** Uses {@link HasDefaultTracker} to produce the tracker. */
+    @SuppressWarnings("unused")
+    public static RestrictionTracker invokeNewTracker(Object restriction) {
+      return ((HasDefaultTracker) restriction).newTracker();
+    }
+  }
+
   /** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
   private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
     Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
@@ -306,7 +317,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
             .method(ElementMatchers.named("invokeGetRestrictionCoder"))
             .intercept(getRestrictionCoderDelegation(clazzDescription, signature))
             .method(ElementMatchers.named("invokeNewTracker"))
-            .intercept(delegateWithDowncastOrThrow(clazzDescription, signature.newTracker()));
+            .intercept(newTrackerDelegation(clazzDescription, signature.newTracker()));
 
     DynamicType.Unloaded<?> unloaded = builder.make();
 
@@ -346,6 +357,17 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
     }
   }
 
+  private static Implementation newTrackerDelegation(
+      TypeDescription doFnType, @Nullable DoFnSignature.NewTrackerMethod signature) {
+    if (signature == null) {
+      // We must have already verified that in this case the restriction type
+      // is a subtype of HasDefaultTracker.
+      return MethodDelegation.to(DefaultNewTracker.class);
+    } else {
+      return delegateWithDowncastOrThrow(doFnType, signature);
+    }
+  }
+
   /** Delegates to the given method if available, or does nothing. */
   private static Implementation delegateOrNoop(
       TypeDescription doFnType, DoFnSignature.DoFnMethod method) {

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 61b9157..006d012 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParam
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.Timer;
@@ -368,42 +369,35 @@ public class DoFnSignatures {
               errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
     }
 
-    DoFnSignature.GetInitialRestrictionMethod getInitialRestriction = null;
-    ErrorReporter getInitialRestrictionErrors = null;
+    ErrorReporter getInitialRestrictionErrors;
     if (getInitialRestrictionMethod != null) {
       getInitialRestrictionErrors =
           errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod);
       signatureBuilder.setGetInitialRestriction(
-          getInitialRestriction =
               analyzeGetInitialRestrictionMethod(
                   getInitialRestrictionErrors, fnT, getInitialRestrictionMethod, inputT));
     }
 
-    DoFnSignature.SplitRestrictionMethod splitRestriction = null;
     if (splitRestrictionMethod != null) {
       ErrorReporter splitRestrictionErrors =
           errors.forMethod(DoFn.SplitRestriction.class, splitRestrictionMethod);
       signatureBuilder.setSplitRestriction(
-          splitRestriction =
               analyzeSplitRestrictionMethod(
                   splitRestrictionErrors, fnT, splitRestrictionMethod, inputT));
     }
 
-    DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = null;
     if (getRestrictionCoderMethod != null) {
       ErrorReporter getRestrictionCoderErrors =
           errors.forMethod(DoFn.GetRestrictionCoder.class, getRestrictionCoderMethod);
       signatureBuilder.setGetRestrictionCoder(
-          getRestrictionCoder =
               analyzeGetRestrictionCoderMethod(
                   getRestrictionCoderErrors, fnT, getRestrictionCoderMethod));
     }
 
-    DoFnSignature.NewTrackerMethod newTracker = null;
     if (newTrackerMethod != null) {
       ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod);
       signatureBuilder.setNewTracker(
-          newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod));
+          analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod));
     }
 
     signatureBuilder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));
@@ -501,38 +495,66 @@ public class DoFnSignatures {
     ErrorReporter processElementErrors =
         errors.forMethod(DoFn.ProcessElement.class, processElement.targetMethod());
 
+    final TypeDescriptor<?> trackerT;
+    final String originOfTrackerT;
+
     List<String> missingRequiredMethods = new ArrayList<>();
     if (getInitialRestriction == null) {
       missingRequiredMethods.add("@" + DoFn.GetInitialRestriction.class.getSimpleName());
     }
     if (newTracker == null) {
-      missingRequiredMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+      if (getInitialRestriction != null
+          && getInitialRestriction
+              .restrictionT()
+              .isSubtypeOf(TypeDescriptor.of(HasDefaultTracker.class))) {
+        trackerT =
+            getInitialRestriction
+                .restrictionT()
+                .resolveType(HasDefaultTracker.class.getTypeParameters()[1]);
+        originOfTrackerT =
+            String.format(
+                "restriction type %s of @%s method %s",
+                formatType(getInitialRestriction.restrictionT()),
+                DoFn.GetInitialRestriction.class.getSimpleName(),
+                format(getInitialRestriction.targetMethod()));
+      } else {
+        missingRequiredMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+        trackerT = null;
+        originOfTrackerT = null;
+      }
+    } else {
+      trackerT = newTracker.trackerT();
+      originOfTrackerT =
+          String.format(
+              "%s method %s",
+              DoFn.NewTracker.class.getSimpleName(), format(newTracker.targetMethod()));
+      ErrorReporter getInitialRestrictionErrors =
+          errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
+      TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
+      getInitialRestrictionErrors.checkArgument(
+          restrictionT.equals(newTracker.restrictionT()),
+          "Uses restriction type %s, but @%s method %s uses restriction type %s",
+          formatType(restrictionT),
+          DoFn.NewTracker.class.getSimpleName(),
+          format(newTracker.targetMethod()),
+          formatType(newTracker.restrictionT()));
     }
+
     if (!missingRequiredMethods.isEmpty()) {
       processElementErrors.throwIllegalArgument(
           "Splittable, but does not define the following required methods: %s",
           missingRequiredMethods);
     }
 
-    processElementErrors.checkArgument(
-        processElement.trackerT().equals(newTracker.trackerT()),
-        "Has tracker type %s, but @%s method %s uses tracker type %s",
-        formatType(processElement.trackerT()),
-        DoFn.NewTracker.class.getSimpleName(),
-        format(newTracker.targetMethod()),
-        formatType(newTracker.trackerT()));
-
     ErrorReporter getInitialRestrictionErrors =
         errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
     TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
-
-    getInitialRestrictionErrors.checkArgument(
-        restrictionT.equals(newTracker.restrictionT()),
-        "Uses restriction type %s, but @%s method %s uses restriction type %s",
-        formatType(restrictionT),
-        DoFn.NewTracker.class.getSimpleName(),
-        format(newTracker.targetMethod()),
-        formatType(newTracker.restrictionT()));
+    processElementErrors.checkArgument(
+        processElement.trackerT().equals(trackerT),
+        "Has tracker type %s, but the DoFn's tracker type was inferred as %s from %s",
+        formatType(processElement.trackerT()),
+        trackerT,
+        originOfTrackerT);
 
     if (getRestrictionCoder != null) {
       getInitialRestrictionErrors.checkArgument(

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java
new file mode 100644
index 0000000..3366dfe
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+/**
+ * Interface for restrictions for which a default implementation of {@link
+ * org.apache.beam.sdk.transforms.DoFn.NewTracker} is available, depending only on the restriction
+ * itself.
+ */
+public interface HasDefaultTracker<
+    RestrictionT extends HasDefaultTracker<RestrictionT, TrackerT>,
+    TrackerT extends RestrictionTracker<RestrictionT>> {
+  /** Creates a new tracker for {@code this}. */
+  TrackerT newTracker();
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
index 67031c4..104f5f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
@@ -22,7 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 import java.io.Serializable;
 
 /** A restriction represented by a range of integers [from, to). */
-public class OffsetRange implements Serializable {
+public class OffsetRange
+    implements Serializable, HasDefaultTracker<OffsetRange, OffsetRangeTracker> {
   private final long from;
   private final long to;
 
@@ -41,6 +42,11 @@ public class OffsetRange implements Serializable {
   }
 
   @Override
+  public OffsetRangeTracker newTracker() {
+    return new OffsetRangeTracker(this);
+  }
+
+  @Override
   public String toString() {
     return "[" + from + ", " + to + ')';
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index d926f6c..154a088 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -88,11 +88,6 @@ public class SplittableDoFnTest {
       receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() + range.getTo()) / 2));
       receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2, range.getTo()));
     }
-
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      return new OffsetRangeTracker(range);
-    }
   }
 
   private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
@@ -220,11 +215,6 @@ public class SplittableDoFnTest {
     public OffsetRange getInitialRange(String element) {
       return new OffsetRange(0, MAX_INDEX);
     }
-
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      return new OffsetRangeTracker(range);
-    }
   }
 
   @Test
@@ -259,11 +249,6 @@ public class SplittableDoFnTest {
     public OffsetRange getInitialRestriction(Integer value) {
       return new OffsetRange(0, 1);
     }
-
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      return new OffsetRangeTracker(range);
-    }
   }
 
   @Test
@@ -357,11 +342,6 @@ public class SplittableDoFnTest {
       return new OffsetRange(0, 1);
     }
 
-    @NewTracker
-    public OffsetRangeTracker newTracker(OffsetRange range) {
-      return new OffsetRangeTracker(range);
-    }
-
     @Setup
     public void setUp() {
       assertEquals(State.BEFORE_SETUP, state);

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 5d3746f..425f453 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
 import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -391,19 +392,49 @@ public class DoFnInvokersTest {
             }));
   }
 
+  private static class RestrictionWithDefaultTracker
+      implements HasDefaultTracker<RestrictionWithDefaultTracker, DefaultTracker> {
+    @Override
+    public DefaultTracker newTracker() {
+      return new DefaultTracker();
+    }
+  }
+
+  private static class DefaultTracker implements RestrictionTracker<RestrictionWithDefaultTracker> {
+    @Override
+    public RestrictionWithDefaultTracker currentRestriction() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public RestrictionWithDefaultTracker checkpoint() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  private static class CoderForDefaultTracker extends CustomCoder<RestrictionWithDefaultTracker> {
+    public static CoderForDefaultTracker of() {
+      return new CoderForDefaultTracker();
+    }
+
+    @Override
+    public void encode(
+        RestrictionWithDefaultTracker value, OutputStream outStream, Context context) {}
+
+    @Override
+    public RestrictionWithDefaultTracker decode(InputStream inStream, Context context) {
+      return null;
+    }
+  }
+
   @Test
   public void testSplittableDoFnDefaultMethods() throws Exception {
     class MockFn extends DoFn<String, String> {
       @ProcessElement
-      public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
+      public void processElement(ProcessContext c, DefaultTracker tracker) {}
 
       @GetInitialRestriction
-      public SomeRestriction getInitialRestriction(String element) {
-        return null;
-      }
-
-      @NewTracker
-      public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      public RestrictionWithDefaultTracker getInitialRestriction(String element) {
         return null;
       }
     }
@@ -411,10 +442,10 @@ public class DoFnInvokersTest {
     DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
 
     CoderRegistry coderRegistry = new CoderRegistry();
-    coderRegistry.registerCoder(SomeRestriction.class, SomeRestrictionCoder.class);
+    coderRegistry.registerCoder(RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class);
     assertThat(
-        invoker.<SomeRestriction>invokeGetRestrictionCoder(coderRegistry),
-        instanceOf(SomeRestrictionCoder.class));
+        invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
+        instanceOf(CoderForDefaultTracker.class));
     invoker.invokeSplitRestriction(
         "blah",
         "foo",
@@ -430,6 +461,9 @@ public class DoFnInvokersTest {
         });
     assertEquals(
         ProcessContinuation.stop(), invoker.invokeProcessElement(mockArgumentProvider));
+    assertThat(
+        invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
+        instanceOf(DefaultTracker.class));
   }
 
   // ---------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index c10d199..052feb8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
 import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
@@ -330,6 +331,48 @@ public class DoFnSignaturesSplittableDoFnTest {
     DoFnSignatures.getSignature(BadFn.class);
   }
 
+  abstract class SomeDefaultTracker implements RestrictionTracker<RestrictionWithDefaultTracker> {}
+  abstract class RestrictionWithDefaultTracker
+      implements HasDefaultTracker<RestrictionWithDefaultTracker, SomeDefaultTracker> {}
+
+  @Test
+  public void testHasDefaultTracker() throws Exception {
+    class Fn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext c, SomeDefaultTracker tracker) {}
+
+      @GetInitialRestriction
+      public RestrictionWithDefaultTracker getInitialRestriction(Integer element) {
+        return null;
+      }
+    }
+
+    DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+    assertEquals(
+        SomeDefaultTracker.class, signature.processElement().trackerT().getRawType());
+  }
+
+  @Test
+  public void testRestrictionHasDefaultTrackerProcessUsesWrongTracker() throws Exception {
+    class Fn extends DoFn<Integer, String> {
+      @ProcessElement
+      public void process(ProcessContext c, SomeRestrictionTracker tracker) {}
+
+      @GetInitialRestriction
+      public RestrictionWithDefaultTracker getInitialRestriction(Integer element) {
+        return null;
+      }
+    }
+
+    thrown.expectMessage(
+        "Has tracker type SomeRestrictionTracker, but the DoFn's tracker type was inferred as ");
+    thrown.expectMessage("SomeDefaultTracker");
+    thrown.expectMessage(
+        "from restriction type RestrictionWithDefaultTracker "
+            + "of @GetInitialRestriction method getInitialRestriction(Integer)");
+    DoFnSignatures.getSignature(Fn.class);
+  }
+
   @Test
   public void testNewTrackerReturnsWrongType() throws Exception {
     class BadFn extends DoFn<Integer, String> {


[3/3] beam git commit: This closes #2462

Posted by jk...@apache.org.
This closes #2462


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4a694ceb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4a694ceb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4a694ceb

Branch: refs/heads/master
Commit: 4a694cebb5deaf4f0d24bea18fa2194798a8ac0f
Parents: 4fd8dc4 03a99b4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 15:55:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 15:55:43 2017 -0700

----------------------------------------------------------------------
 ...ndedSplittableProcessElementInvokerTest.java |  5 --
 .../beam/runners/core/SplittableParDoTest.java  | 53 ++++++--------
 .../reflect/ByteBuddyDoFnInvokerFactory.java    | 26 ++++++-
 .../sdk/transforms/reflect/DoFnSignatures.java  | 74 +++++++++++++-------
 .../splittabledofn/HasDefaultTracker.java       | 30 ++++++++
 .../transforms/splittabledofn/OffsetRange.java  |  8 ++-
 .../beam/sdk/transforms/SplittableDoFnTest.java | 20 ------
 .../transforms/reflect/DoFnInvokersTest.java    | 54 +++++++++++---
 .../DoFnSignaturesSplittableDoFnTest.java       | 43 ++++++++++++
 9 files changed, 215 insertions(+), 98 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Cleanup: removes two unused constants

Posted by jk...@apache.org.
Cleanup: removes two unused constants


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8179d8c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8179d8c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8179d8c8

Branch: refs/heads/master
Commit: 8179d8c8af9f3afaa785755b13822d2bd508f789
Parents: 4fd8dc4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 15:54:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 15:55:27 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java   | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8179d8c8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 8e3a37c..0155297 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -85,8 +85,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
   public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext";
   public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext";
   public static final String WINDOW_PARAMETER_METHOD = "window";
-  public static final String INPUT_PROVIDER_PARAMETER_METHOD = "inputProvider";
-  public static final String OUTPUT_RECEIVER_PARAMETER_METHOD = "outputReceiver";
   public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker";
   public static final String STATE_PARAMETER_METHOD = "state";
   public static final String TIMER_PARAMETER_METHOD = "timer";