You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/07 23:06:19 UTC
[1/3] beam git commit: [BEAM-65] Adds HasDefaultTracker for
RestrictionTracker inference
Repository: beam
Updated Branches:
refs/heads/master 4fd8dc401 -> 4a694cebb
[BEAM-65] Adds HasDefaultTracker for RestrictionTracker inference
Allows a restriction type to implement HasDefaultTracker,
in that case the splittable DoFn itself does not need to
implement NewTracker - only ProcessElement and GetInitialRestriction.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/03a99b4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/03a99b4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/03a99b4d
Branch: refs/heads/master
Commit: 03a99b4df18ceeb7cd61953c461c4cca377a37ae
Parents: 8179d8c
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 12:09:47 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 15:55:27 2017 -0700
----------------------------------------------------------------------
...ndedSplittableProcessElementInvokerTest.java | 5 --
.../beam/runners/core/SplittableParDoTest.java | 53 ++++++--------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 24 ++++++-
.../sdk/transforms/reflect/DoFnSignatures.java | 74 +++++++++++++-------
.../splittabledofn/HasDefaultTracker.java | 30 ++++++++
.../transforms/splittabledofn/OffsetRange.java | 8 ++-
.../beam/sdk/transforms/SplittableDoFnTest.java | 20 ------
.../transforms/reflect/DoFnInvokersTest.java | 54 +++++++++++---
.../DoFnSignaturesSplittableDoFnTest.java | 43 ++++++++++++
9 files changed, 215 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
index d7c9889..b85f481 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java
@@ -71,11 +71,6 @@ public class OutputAndTimeBoundedSplittableProcessElementInvokerTest {
public OffsetRange getInitialRestriction(Integer element) {
throw new UnsupportedOperationException("Should not be called in this test");
}
-
- @NewTracker
- public OffsetRangeTracker newTracker(OffsetRange range) {
- throw new UnsupportedOperationException("Should not be called in this test");
- }
}
private SplittableProcessElementInvoker<Integer, String, OffsetRange, OffsetRangeTracker>.Result
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index ee94ee0..6205777 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -72,10 +73,20 @@ public class SplittableParDoTest {
private static final Duration MAX_BUNDLE_DURATION = Duration.standardSeconds(5);
// ----------------- Tests for whether the transform sets boundedness correctly --------------
- private static class SomeRestriction implements Serializable {}
+ private static class SomeRestriction
+ implements Serializable, HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {
+ @Override
+ public SomeRestrictionTracker newTracker() {
+ return new SomeRestrictionTracker(this);
+ }
+ }
private static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> {
- private final SomeRestriction someRestriction = new SomeRestriction();
+ private final SomeRestriction someRestriction;
+
+ public SomeRestrictionTracker(SomeRestriction someRestriction) {
+ this.someRestriction = someRestriction;
+ }
@Override
public SomeRestriction currentRestriction() {
@@ -96,11 +107,6 @@ public class SplittableParDoTest {
public SomeRestriction getInitialRestriction(Integer element) {
return null;
}
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
}
private static class UnboundedFakeFn extends DoFn<Integer, String> {
@@ -114,11 +120,6 @@ public class SplittableParDoTest {
public SomeRestriction getInitialRestriction(Integer element) {
return null;
}
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
}
private static PCollection<Integer> makeUnboundedCollection(Pipeline pipeline) {
@@ -376,11 +377,6 @@ public class SplittableParDoTest {
public SomeRestriction getInitialRestriction(Integer elem) {
return new SomeRestriction();
}
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return new SomeRestrictionTracker();
- }
}
@Test
@@ -438,11 +434,6 @@ public class SplittableParDoTest {
public SomeRestriction getInitialRestriction(Integer elem) {
return new SomeRestriction();
}
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return new SomeRestrictionTracker();
- }
}
@Test
@@ -474,12 +465,18 @@ public class SplittableParDoTest {
assertThat(tester.takeOutputElements(), contains("42"));
}
- private static class SomeCheckpoint implements Serializable {
+ private static class SomeCheckpoint
+ implements Serializable, HasDefaultTracker<SomeCheckpoint, SomeCheckpointTracker> {
private int firstUnprocessedIndex;
private SomeCheckpoint(int firstUnprocessedIndex) {
this.firstUnprocessedIndex = firstUnprocessedIndex;
}
+
+ @Override
+ public SomeCheckpointTracker newTracker() {
+ return new SomeCheckpointTracker(this);
+ }
}
private static class SomeCheckpointTracker implements RestrictionTracker<SomeCheckpoint> {
@@ -543,11 +540,6 @@ public class SplittableParDoTest {
public SomeCheckpoint getInitialRestriction(Integer elem) {
throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
}
-
- @NewTracker
- public SomeCheckpointTracker newTracker(SomeCheckpoint restriction) {
- return new SomeCheckpointTracker(restriction);
- }
}
@Test
@@ -658,11 +650,6 @@ public class SplittableParDoTest {
return new SomeRestriction();
}
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return new SomeRestrictionTracker();
- }
-
@Setup
public void setup() {
assertEquals(State.BEFORE_SETUP, state);
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 0155297..6746d3a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -74,6 +74,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.Restrictio
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.StateParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -257,6 +259,15 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
}
}
+ /** Default implementation of {@link DoFn.NewTracker}, for delegation by bytebuddy. */
+ public static class DefaultNewTracker {
+ /** Uses {@link HasDefaultTracker} to produce the tracker. */
+ @SuppressWarnings("unused")
+ public static RestrictionTracker invokeNewTracker(Object restriction) {
+ return ((HasDefaultTracker) restriction).newTracker();
+ }
+ }
+
/** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
@@ -306,7 +317,7 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
.method(ElementMatchers.named("invokeGetRestrictionCoder"))
.intercept(getRestrictionCoderDelegation(clazzDescription, signature))
.method(ElementMatchers.named("invokeNewTracker"))
- .intercept(delegateWithDowncastOrThrow(clazzDescription, signature.newTracker()));
+ .intercept(newTrackerDelegation(clazzDescription, signature.newTracker()));
DynamicType.Unloaded<?> unloaded = builder.make();
@@ -346,6 +357,17 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
}
}
+ private static Implementation newTrackerDelegation(
+ TypeDescription doFnType, @Nullable DoFnSignature.NewTrackerMethod signature) {
+ if (signature == null) {
+ // We must have already verified that in this case the restriction type
+ // is a subtype of HasDefaultTracker.
+ return MethodDelegation.to(DefaultNewTracker.class);
+ } else {
+ return delegateWithDowncastOrThrow(doFnType, signature);
+ }
+ }
+
/** Delegates to the given method if available, or does nothing. */
private static Implementation delegateOrNoop(
TypeDescription doFnType, DoFnSignature.DoFnMethod method) {
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 61b9157..006d012 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.TimerParam
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.WindowParameter;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.Timer;
@@ -368,42 +369,35 @@ public class DoFnSignatures {
errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
}
- DoFnSignature.GetInitialRestrictionMethod getInitialRestriction = null;
- ErrorReporter getInitialRestrictionErrors = null;
+ ErrorReporter getInitialRestrictionErrors;
if (getInitialRestrictionMethod != null) {
getInitialRestrictionErrors =
errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod);
signatureBuilder.setGetInitialRestriction(
- getInitialRestriction =
analyzeGetInitialRestrictionMethod(
getInitialRestrictionErrors, fnT, getInitialRestrictionMethod, inputT));
}
- DoFnSignature.SplitRestrictionMethod splitRestriction = null;
if (splitRestrictionMethod != null) {
ErrorReporter splitRestrictionErrors =
errors.forMethod(DoFn.SplitRestriction.class, splitRestrictionMethod);
signatureBuilder.setSplitRestriction(
- splitRestriction =
analyzeSplitRestrictionMethod(
splitRestrictionErrors, fnT, splitRestrictionMethod, inputT));
}
- DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = null;
if (getRestrictionCoderMethod != null) {
ErrorReporter getRestrictionCoderErrors =
errors.forMethod(DoFn.GetRestrictionCoder.class, getRestrictionCoderMethod);
signatureBuilder.setGetRestrictionCoder(
- getRestrictionCoder =
analyzeGetRestrictionCoderMethod(
getRestrictionCoderErrors, fnT, getRestrictionCoderMethod));
}
- DoFnSignature.NewTrackerMethod newTracker = null;
if (newTrackerMethod != null) {
ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod);
signatureBuilder.setNewTracker(
- newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod));
+ analyzeNewTrackerMethod(newTrackerErrors, fnT, newTrackerMethod));
}
signatureBuilder.setIsBoundedPerElement(inferBoundedness(fnT, processElement, errors));
@@ -501,38 +495,66 @@ public class DoFnSignatures {
ErrorReporter processElementErrors =
errors.forMethod(DoFn.ProcessElement.class, processElement.targetMethod());
+ final TypeDescriptor<?> trackerT;
+ final String originOfTrackerT;
+
List<String> missingRequiredMethods = new ArrayList<>();
if (getInitialRestriction == null) {
missingRequiredMethods.add("@" + DoFn.GetInitialRestriction.class.getSimpleName());
}
if (newTracker == null) {
- missingRequiredMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+ if (getInitialRestriction != null
+ && getInitialRestriction
+ .restrictionT()
+ .isSubtypeOf(TypeDescriptor.of(HasDefaultTracker.class))) {
+ trackerT =
+ getInitialRestriction
+ .restrictionT()
+ .resolveType(HasDefaultTracker.class.getTypeParameters()[1]);
+ originOfTrackerT =
+ String.format(
+ "restriction type %s of @%s method %s",
+ formatType(getInitialRestriction.restrictionT()),
+ DoFn.GetInitialRestriction.class.getSimpleName(),
+ format(getInitialRestriction.targetMethod()));
+ } else {
+ missingRequiredMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+ trackerT = null;
+ originOfTrackerT = null;
+ }
+ } else {
+ trackerT = newTracker.trackerT();
+ originOfTrackerT =
+ String.format(
+ "%s method %s",
+ DoFn.NewTracker.class.getSimpleName(), format(newTracker.targetMethod()));
+ ErrorReporter getInitialRestrictionErrors =
+ errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
+ TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
+ getInitialRestrictionErrors.checkArgument(
+ restrictionT.equals(newTracker.restrictionT()),
+ "Uses restriction type %s, but @%s method %s uses restriction type %s",
+ formatType(restrictionT),
+ DoFn.NewTracker.class.getSimpleName(),
+ format(newTracker.targetMethod()),
+ formatType(newTracker.restrictionT()));
}
+
if (!missingRequiredMethods.isEmpty()) {
processElementErrors.throwIllegalArgument(
"Splittable, but does not define the following required methods: %s",
missingRequiredMethods);
}
- processElementErrors.checkArgument(
- processElement.trackerT().equals(newTracker.trackerT()),
- "Has tracker type %s, but @%s method %s uses tracker type %s",
- formatType(processElement.trackerT()),
- DoFn.NewTracker.class.getSimpleName(),
- format(newTracker.targetMethod()),
- formatType(newTracker.trackerT()));
-
ErrorReporter getInitialRestrictionErrors =
errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
TypeDescriptor<?> restrictionT = getInitialRestriction.restrictionT();
-
- getInitialRestrictionErrors.checkArgument(
- restrictionT.equals(newTracker.restrictionT()),
- "Uses restriction type %s, but @%s method %s uses restriction type %s",
- formatType(restrictionT),
- DoFn.NewTracker.class.getSimpleName(),
- format(newTracker.targetMethod()),
- formatType(newTracker.restrictionT()));
+ processElementErrors.checkArgument(
+ processElement.trackerT().equals(trackerT),
+ "Has tracker type %s, but the DoFn's tracker type was inferred as %s from %s",
+ formatType(processElement.trackerT()),
+ trackerT,
+ originOfTrackerT);
if (getRestrictionCoder != null) {
getInitialRestrictionErrors.checkArgument(
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java
new file mode 100644
index 0000000..3366dfe
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/HasDefaultTracker.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+/**
+ * Interface for restrictions for which a default implementation of {@link
+ * org.apache.beam.sdk.transforms.DoFn.NewTracker} is available, depending only on the restriction
+ * itself.
+ */
+public interface HasDefaultTracker<
+ RestrictionT extends HasDefaultTracker<RestrictionT, TrackerT>,
+ TrackerT extends RestrictionTracker<RestrictionT>> {
+ /** Creates a new tracker for {@code this}. */
+ TrackerT newTracker();
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
index 67031c4..104f5f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
@@ -22,7 +22,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
/** A restriction represented by a range of integers [from, to). */
-public class OffsetRange implements Serializable {
+public class OffsetRange
+ implements Serializable, HasDefaultTracker<OffsetRange, OffsetRangeTracker> {
private final long from;
private final long to;
@@ -41,6 +42,11 @@ public class OffsetRange implements Serializable {
}
@Override
+ public OffsetRangeTracker newTracker() {
+ return new OffsetRangeTracker(this);
+ }
+
+ @Override
public String toString() {
return "[" + from + ", " + to + ')';
}
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index d926f6c..154a088 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -88,11 +88,6 @@ public class SplittableDoFnTest {
receiver.output(new OffsetRange(range.getFrom(), (range.getFrom() + range.getTo()) / 2));
receiver.output(new OffsetRange((range.getFrom() + range.getTo()) / 2, range.getTo()));
}
-
- @NewTracker
- public OffsetRangeTracker newTracker(OffsetRange range) {
- return new OffsetRangeTracker(range);
- }
}
private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
@@ -220,11 +215,6 @@ public class SplittableDoFnTest {
public OffsetRange getInitialRange(String element) {
return new OffsetRange(0, MAX_INDEX);
}
-
- @NewTracker
- public OffsetRangeTracker newTracker(OffsetRange range) {
- return new OffsetRangeTracker(range);
- }
}
@Test
@@ -259,11 +249,6 @@ public class SplittableDoFnTest {
public OffsetRange getInitialRestriction(Integer value) {
return new OffsetRange(0, 1);
}
-
- @NewTracker
- public OffsetRangeTracker newTracker(OffsetRange range) {
- return new OffsetRangeTracker(range);
- }
}
@Test
@@ -357,11 +342,6 @@ public class SplittableDoFnTest {
return new OffsetRange(0, 1);
}
- @NewTracker
- public OffsetRangeTracker newTracker(OffsetRange range) {
- return new OffsetRangeTracker(range);
- }
-
@Setup
public void setUp() {
assertEquals(State.BEFORE_SETUP, state);
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 5d3746f..425f453 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker.FakeArgumentProvider;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -391,19 +392,49 @@ public class DoFnInvokersTest {
}));
}
+ private static class RestrictionWithDefaultTracker
+ implements HasDefaultTracker<RestrictionWithDefaultTracker, DefaultTracker> {
+ @Override
+ public DefaultTracker newTracker() {
+ return new DefaultTracker();
+ }
+ }
+
+ private static class DefaultTracker implements RestrictionTracker<RestrictionWithDefaultTracker> {
+ @Override
+ public RestrictionWithDefaultTracker currentRestriction() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RestrictionWithDefaultTracker checkpoint() {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static class CoderForDefaultTracker extends CustomCoder<RestrictionWithDefaultTracker> {
+ public static CoderForDefaultTracker of() {
+ return new CoderForDefaultTracker();
+ }
+
+ @Override
+ public void encode(
+ RestrictionWithDefaultTracker value, OutputStream outStream, Context context) {}
+
+ @Override
+ public RestrictionWithDefaultTracker decode(InputStream inStream, Context context) {
+ return null;
+ }
+ }
+
@Test
public void testSplittableDoFnDefaultMethods() throws Exception {
class MockFn extends DoFn<String, String> {
@ProcessElement
- public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
+ public void processElement(ProcessContext c, DefaultTracker tracker) {}
@GetInitialRestriction
- public SomeRestriction getInitialRestriction(String element) {
- return null;
- }
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ public RestrictionWithDefaultTracker getInitialRestriction(String element) {
return null;
}
}
@@ -411,10 +442,10 @@ public class DoFnInvokersTest {
DoFnInvoker<String, String> invoker = DoFnInvokers.invokerFor(fn);
CoderRegistry coderRegistry = new CoderRegistry();
- coderRegistry.registerCoder(SomeRestriction.class, SomeRestrictionCoder.class);
+ coderRegistry.registerCoder(RestrictionWithDefaultTracker.class, CoderForDefaultTracker.class);
assertThat(
- invoker.<SomeRestriction>invokeGetRestrictionCoder(coderRegistry),
- instanceOf(SomeRestrictionCoder.class));
+ invoker.<RestrictionWithDefaultTracker>invokeGetRestrictionCoder(coderRegistry),
+ instanceOf(CoderForDefaultTracker.class));
invoker.invokeSplitRestriction(
"blah",
"foo",
@@ -430,6 +461,9 @@ public class DoFnInvokersTest {
});
assertEquals(
ProcessContinuation.stop(), invoker.invokeProcessElement(mockArgumentProvider));
+ assertThat(
+ invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
+ instanceOf(DefaultTracker.class));
}
// ---------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/03a99b4d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index c10d199..052feb8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -32,6 +32,7 @@ import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
@@ -330,6 +331,48 @@ public class DoFnSignaturesSplittableDoFnTest {
DoFnSignatures.getSignature(BadFn.class);
}
+ abstract class SomeDefaultTracker implements RestrictionTracker<RestrictionWithDefaultTracker> {}
+ abstract class RestrictionWithDefaultTracker
+ implements HasDefaultTracker<RestrictionWithDefaultTracker, SomeDefaultTracker> {}
+
+ @Test
+ public void testHasDefaultTracker() throws Exception {
+ class Fn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext c, SomeDefaultTracker tracker) {}
+
+ @GetInitialRestriction
+ public RestrictionWithDefaultTracker getInitialRestriction(Integer element) {
+ return null;
+ }
+ }
+
+ DoFnSignature signature = DoFnSignatures.getSignature(Fn.class);
+ assertEquals(
+ SomeDefaultTracker.class, signature.processElement().trackerT().getRawType());
+ }
+
+ @Test
+ public void testRestrictionHasDefaultTrackerProcessUsesWrongTracker() throws Exception {
+ class Fn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext c, SomeRestrictionTracker tracker) {}
+
+ @GetInitialRestriction
+ public RestrictionWithDefaultTracker getInitialRestriction(Integer element) {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "Has tracker type SomeRestrictionTracker, but the DoFn's tracker type was inferred as ");
+ thrown.expectMessage("SomeDefaultTracker");
+ thrown.expectMessage(
+ "from restriction type RestrictionWithDefaultTracker "
+ + "of @GetInitialRestriction method getInitialRestriction(Integer)");
+ DoFnSignatures.getSignature(Fn.class);
+ }
+
@Test
public void testNewTrackerReturnsWrongType() throws Exception {
class BadFn extends DoFn<Integer, String> {
[3/3] beam git commit: This closes #2462
Posted by jk...@apache.org.
This closes #2462
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4a694ceb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4a694ceb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4a694ceb
Branch: refs/heads/master
Commit: 4a694cebb5deaf4f0d24bea18fa2194798a8ac0f
Parents: 4fd8dc4 03a99b4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 15:55:43 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 15:55:43 2017 -0700
----------------------------------------------------------------------
...ndedSplittableProcessElementInvokerTest.java | 5 --
.../beam/runners/core/SplittableParDoTest.java | 53 ++++++--------
.../reflect/ByteBuddyDoFnInvokerFactory.java | 26 ++++++-
.../sdk/transforms/reflect/DoFnSignatures.java | 74 +++++++++++++-------
.../splittabledofn/HasDefaultTracker.java | 30 ++++++++
.../transforms/splittabledofn/OffsetRange.java | 8 ++-
.../beam/sdk/transforms/SplittableDoFnTest.java | 20 ------
.../transforms/reflect/DoFnInvokersTest.java | 54 +++++++++++---
.../DoFnSignaturesSplittableDoFnTest.java | 43 ++++++++++++
9 files changed, 215 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Cleanup: removes two unused constants
Posted by jk...@apache.org.
Cleanup: removes two unused constants
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8179d8c8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8179d8c8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8179d8c8
Branch: refs/heads/master
Commit: 8179d8c8af9f3afaa785755b13822d2bd508f789
Parents: 4fd8dc4
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Apr 7 15:54:03 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Apr 7 15:55:27 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8179d8c8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index 8e3a37c..0155297 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -85,8 +85,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext";
public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext";
public static final String WINDOW_PARAMETER_METHOD = "window";
- public static final String INPUT_PROVIDER_PARAMETER_METHOD = "inputProvider";
- public static final String OUTPUT_RECEIVER_PARAMETER_METHOD = "outputReceiver";
public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker";
public static final String STATE_PARAMETER_METHOD = "state";
public static final String TIMER_PARAMETER_METHOD = "timer";