You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/10/13 00:38:28 UTC
[1/4] incubator-beam git commit: [BEAM-65] SplittableDoFn prototype.
Repository: incubator-beam
Updated Branches:
refs/heads/master a5d129361 -> 13b45895e
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index d057765..0bfe2be 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,14 +17,32 @@
*/
package org.apache.beam.sdk.transforms.reflect;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.same;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
-
+import static org.mockito.Mockito.when;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.GetInitialRestriction;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.reflect.testhelper.DoFnInvokersTestHelper;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -34,7 +52,9 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.AdditionalAnswers;
import org.mockito.Mock;
+import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
/** Tests for {@link DoFnInvokers}. */
@@ -76,11 +96,16 @@ public class DoFnInvokersTest {
public WindowingInternals<String, String> windowingInternals() {
return mockWindowingInternals;
}
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ return null;
+ }
};
}
- private void invokeProcessElement(DoFn<String, String> fn) {
- DoFnInvokers.INSTANCE
+ private ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
+ return DoFnInvokers.INSTANCE
.newByteBuddyInvoker(fn)
.invokeProcessElement(mockContext, extraContextFactory);
}
@@ -106,9 +131,9 @@ public class DoFnInvokersTest {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {}
}
- MockFn fn = mock(MockFn.class);
- invokeProcessElement(fn);
- verify(fn).processElement(mockContext);
+ MockFn mockFn = mock(MockFn.class);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(mockFn));
+ verify(mockFn).processElement(mockContext);
}
interface InterfaceWithProcessElement {
@@ -128,7 +153,7 @@ public class DoFnInvokersTest {
public void testDoFnWithProcessElementInterface() throws Exception {
IdentityUsingInterfaceWithProcessElement fn =
mock(IdentityUsingInterfaceWithProcessElement.class);
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
verify(fn).processElement(mockContext);
}
@@ -149,14 +174,14 @@ public class DoFnInvokersTest {
@Test
public void testDoFnWithMethodInSuperclass() throws Exception {
IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
verify(fn).process(mockContext);
}
@Test
public void testDoFnWithMethodInSubclass() throws Exception {
IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
verify(fn).process(mockContext);
}
@@ -167,7 +192,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c, BoundedWindow w) throws Exception {}
}
MockFn fn = mock(MockFn.class);
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
verify(fn).processElement(mockContext, mockWindow);
}
@@ -178,7 +203,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c, OutputReceiver<String> o) throws Exception {}
}
MockFn fn = mock(MockFn.class);
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
verify(fn).processElement(mockContext, mockOutputReceiver);
}
@@ -189,11 +214,35 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c, InputProvider<String> o) throws Exception {}
}
MockFn fn = mock(MockFn.class);
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
verify(fn).processElement(mockContext, mockInputProvider);
}
@Test
+ public void testDoFnWithReturn() throws Exception {
+ class MockFn extends DoFn<String, String> {
+ @DoFn.ProcessElement
+ public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
+ throws Exception {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(String element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ }
+ MockFn fn = mock(MockFn.class);
+ when(fn.processElement(mockContext, null)).thenReturn(ProcessContinuation.resume());
+ assertEquals(ProcessContinuation.resume(), invokeProcessElement(fn));
+ }
+
+ @Test
public void testDoFnWithStartBundleSetupTeardown() throws Exception {
class MockFn extends DoFn<String, String> {
@ProcessElement
@@ -224,6 +273,154 @@ public class DoFnInvokersTest {
}
// ---------------------------------------------------------------------------------------
+ // Tests for invoking Splittable DoFn methods
+ // ---------------------------------------------------------------------------------------
+ private static class SomeRestriction {}
+
+ private abstract static class SomeRestrictionTracker
+ implements RestrictionTracker<SomeRestriction> {}
+
+ private static class SomeRestrictionCoder extends CustomCoder<SomeRestriction> {
+ public static SomeRestrictionCoder of() {
+ return new SomeRestrictionCoder();
+ }
+
+ @Override
+ public void encode(SomeRestriction value, OutputStream outStream, Context context) {}
+
+ @Override
+ public SomeRestriction decode(InputStream inStream, Context context) {
+ return null;
+ }
+ }
+
+ /** Public so Mockito can do "delegatesTo()" in the test below. */
+ public static class MockFn extends DoFn<String, String> {
+ @ProcessElement
+ public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(String element) {
+ return null;
+ }
+
+ @SplitRestriction
+ public void splitRestriction(
+ String element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+
+ @GetRestrictionCoder
+ public SomeRestrictionCoder getRestrictionCoder() {
+ return null;
+ }
+ }
+
+ @Test
+ public void testSplittableDoFnWithAllMethods() throws Exception {
+ MockFn fn = mock(MockFn.class);
+ DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ final SomeRestrictionTracker tracker = mock(SomeRestrictionTracker.class);
+ final SomeRestrictionCoder coder = mock(SomeRestrictionCoder.class);
+ SomeRestriction restriction = new SomeRestriction();
+ final SomeRestriction part1 = new SomeRestriction();
+ final SomeRestriction part2 = new SomeRestriction();
+ final SomeRestriction part3 = new SomeRestriction();
+ when(fn.getRestrictionCoder()).thenReturn(coder);
+ when(fn.getInitialRestriction("blah")).thenReturn(restriction);
+ doAnswer(
+ AdditionalAnswers.delegatesTo(
+ new MockFn() {
+ @DoFn.SplitRestriction
+ @Override
+ public void splitRestriction(
+ String element,
+ SomeRestriction restriction,
+ DoFn.OutputReceiver<SomeRestriction> receiver) {
+ receiver.output(part1);
+ receiver.output(part2);
+ receiver.output(part3);
+ }
+ }))
+ .when(fn)
+ .splitRestriction(
+ eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
+ when(fn.newTracker(restriction)).thenReturn(tracker);
+ when(fn.processElement(mockContext, tracker)).thenReturn(ProcessContinuation.resume());
+
+ assertEquals(coder, invoker.invokeGetRestrictionCoder(new CoderRegistry()));
+ assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
+ final List<SomeRestriction> outputs = new ArrayList<>();
+ invoker.invokeSplitRestriction(
+ "blah",
+ restriction,
+ new DoFn.OutputReceiver<SomeRestriction>() {
+ @Override
+ public void output(SomeRestriction output) {
+ outputs.add(output);
+ }
+ });
+ assertEquals(Arrays.asList(part1, part2, part3), outputs);
+ assertEquals(tracker, invoker.invokeNewTracker(restriction));
+ assertEquals(
+ ProcessContinuation.resume(),
+ invoker.invokeProcessElement(
+ mockContext,
+ new DoFn.FakeExtraContextFactory<String, String>() {
+ @Override
+ public RestrictionTracker restrictionTracker() {
+ return tracker;
+ }
+ }));
+ }
+
+ @Test
+ public void testSplittableDoFnDefaultMethods() throws Exception {
+ class MockFn extends DoFn<String, String> {
+ @ProcessElement
+ public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(String element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ }
+ MockFn fn = mock(MockFn.class);
+ DoFnInvoker<String, String> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+
+ CoderRegistry coderRegistry = new CoderRegistry();
+ coderRegistry.registerCoder(SomeRestriction.class, SomeRestrictionCoder.class);
+ assertThat(
+ invoker.<SomeRestriction>invokeGetRestrictionCoder(coderRegistry),
+ instanceOf(SomeRestrictionCoder.class));
+ invoker.invokeSplitRestriction(
+ "blah",
+ "foo",
+ new DoFn.OutputReceiver<String>() {
+ private boolean invoked;
+
+ @Override
+ public void output(String output) {
+ assertFalse(invoked);
+ invoked = true;
+ assertEquals("foo", output);
+ }
+ });
+ assertEquals(
+ ProcessContinuation.stop(), invoker.invokeProcessElement(mockContext, extraContextFactory));
+ }
+
+ // ---------------------------------------------------------------------------------------
// Tests for ability to invoke private, inner and anonymous classes.
// ---------------------------------------------------------------------------------------
@@ -235,14 +432,14 @@ public class DoFnInvokersTest {
@Test
public void testLocalPrivateDoFnClass() throws Exception {
PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
verify(fn).processThis(mockContext);
}
@Test
public void testStaticPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockContext);
}
@@ -250,28 +447,28 @@ public class DoFnInvokersTest {
public void testInnerPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn =
mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockContext);
}
@Test
public void testStaticPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockContext);
}
@Test
public void testInnerPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockContext);
}
@Test
public void testAnonymousInnerDoFn() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockContext);
}
@@ -279,7 +476,7 @@ public class DoFnInvokersTest {
public void testStaticAnonymousDoFnInOtherPackage() throws Exception {
// Can't use mockito for this one - the anonymous class is final and can't be mocked.
DoFn<String, String> fn = DoFnInvokersTestHelper.newStaticAnonymousDoFn();
- invokeProcessElement(fn);
+ assertEquals(ProcessContinuation.stop(), invokeProcessElement(fn));
DoFnInvokersTestHelper.verifyStaticAnonymousDoFnInvoked(fn, mockContext);
}
@@ -303,6 +500,32 @@ public class DoFnInvokersTest {
}
@Test
+ public void testProcessElementExceptionWithReturn() throws Exception {
+ thrown.expect(UserCodeException.class);
+ thrown.expectMessage("bogus");
+ DoFnInvokers.INSTANCE
+ .newByteBuddyInvoker(
+ new DoFn<Integer, Integer>() {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
+ throw new IllegalArgumentException("bogus");
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ })
+ .invokeProcessElement(null, new DoFn.FakeExtraContextFactory<Integer, Integer>());
+ }
+
+ @Test
public void testStartBundleException() throws Exception {
DoFnInvoker<Integer, Integer> invoker =
DoFnInvokers.INSTANCE.newByteBuddyInvoker(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index c269dbd..9cb1d23 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -61,7 +61,7 @@ public class DoFnSignaturesProcessElementTest {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Integer is not a valid context parameter. "
- + "Should be one of [BoundedWindow]");
+ + "Should be one of [BoundedWindow, RestrictionTracker<?>]");
analyzeProcessElementMethod(
new AnonymousMethod() {
@@ -72,7 +72,7 @@ public class DoFnSignaturesProcessElementTest {
@Test
public void testBadReturnType() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Must return void");
+ thrown.expectMessage("Must return void or ProcessContinuation");
analyzeProcessElementMethod(
new AnonymousMethod() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
new file mode 100644
index 0000000..a9a7c81
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.reflect;
+
+import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.analyzeProcessElementMethod;
+import static org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.errors;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.reflect.TypeToken;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.DoFn.UnboundedPerElement;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.AnonymousMethod;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignaturesTestUtils.FakeDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DoFnSignatures} focused on methods related to <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ */
+@SuppressWarnings("unused")
+@RunWith(JUnit4.class)
+public class DoFnSignaturesSplittableDoFnTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static class SomeRestriction {}
+
+ private abstract static class SomeRestrictionTracker
+ implements RestrictionTracker<SomeRestriction> {}
+
+ private abstract static class SomeRestrictionCoder implements Coder<SomeRestriction> {}
+
+ @Test
+ public void testReturnsProcessContinuation() throws Exception {
+ DoFnSignature.ProcessElementMethod signature =
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private DoFn.ProcessContinuation method(
+ DoFn<Integer, String>.ProcessContext context) {
+ return null;
+ }
+ });
+
+ assertTrue(signature.hasReturnValue());
+ }
+
+ @Test
+ public void testHasRestrictionTracker() throws Exception {
+ DoFnSignature.ProcessElementMethod signature =
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method(
+ DoFn<Integer, String>.ProcessContext context, SomeRestrictionTracker tracker) {}
+ });
+
+ assertTrue(signature.isSplittable());
+ assertTrue(signature.extraParameters().contains(DoFnSignature.Parameter.RESTRICTION_TRACKER));
+ assertEquals(SomeRestrictionTracker.class, signature.trackerT().getRawType());
+ }
+
+ @Test
+ public void testSplittableProcessElementMustNotHaveOtherParams() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("must not have any extra context arguments");
+ thrown.expectMessage("BOUNDED_WINDOW");
+
+ DoFnSignature.ProcessElementMethod signature =
+ analyzeProcessElementMethod(
+ new AnonymousMethod() {
+ private void method(
+ DoFn<Integer, String>.ProcessContext context,
+ SomeRestrictionTracker tracker,
+ BoundedWindow window) {}
+ });
+ }
+
+ @Test
+ public void testInfersBoundednessFromAnnotation() throws Exception {
+ class BaseSplittableFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ }
+
+ @BoundedPerElement
+ class BoundedSplittableFn extends BaseSplittableFn {}
+
+ @UnboundedPerElement
+ class UnboundedSplittableFn extends BaseSplittableFn {}
+
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(BaseSplittableFn.class)
+ .isBoundedPerElement());
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(BoundedSplittableFn.class)
+ .isBoundedPerElement());
+ assertEquals(
+ PCollection.IsBounded.UNBOUNDED,
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(UnboundedSplittableFn.class)
+ .isBoundedPerElement());
+ }
+
+ @Test
+ public void testUnsplittableIsBounded() throws Exception {
+ class UnsplittableFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context) {}
+ }
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(UnsplittableFn.class)
+ .isBoundedPerElement());
+ }
+
+ private static class BaseFnWithContinuation extends DoFn<Integer, String> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ }
+
+ @Test
+ public void testSplittableIsBoundedByDefault() throws Exception {
+ assertEquals(
+ PCollection.IsBounded.UNBOUNDED,
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(BaseFnWithContinuation.class)
+ .isBoundedPerElement());
+ }
+
+ @Test
+ public void testSplittableRespectsBoundednessAnnotation() throws Exception {
+ @BoundedPerElement
+ class BoundedFnWithContinuation extends BaseFnWithContinuation {}
+
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(BoundedFnWithContinuation.class)
+ .isBoundedPerElement());
+
+ @UnboundedPerElement
+ class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
+
+ assertEquals(
+ PCollection.IsBounded.UNBOUNDED,
+ DoFnSignatures.INSTANCE
+ .getOrParseSignature(UnboundedFnWithContinuation.class)
+ .isBoundedPerElement());
+ }
+
+ @Test
+ public void testUnsplittableButDeclaresBounded() throws Exception {
+ @BoundedPerElement
+ class SomeFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context) {}
+ }
+
+ thrown.expectMessage("Non-splittable, but annotated as @Bounded");
+ DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class);
+ }
+
+ @Test
+ public void testUnsplittableButDeclaresUnbounded() throws Exception {
+ @UnboundedPerElement
+ class SomeFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context) {}
+ }
+
+ thrown.expectMessage("Non-splittable, but annotated as @Unbounded");
+ DoFnSignatures.INSTANCE.getOrParseSignature(SomeFn.class);
+ }
+
+ /** Tests a splittable {@link DoFn} that defines all methods in their full form, correctly. */
+ @Test
+ public void testSplittableWithAllFunctions() throws Exception {
+ class GoodSplittableDoFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @SplitRestriction
+ public void splitRestriction(
+ Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+
+ @GetRestrictionCoder
+ public SomeRestrictionCoder getRestrictionCoder() {
+ return null;
+ }
+ }
+
+ DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(GoodSplittableDoFn.class);
+ assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
+ assertTrue(signature.processElement().isSplittable());
+ assertTrue(signature.processElement().hasReturnValue());
+ assertEquals(
+ SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
+ assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
+ assertEquals(SomeRestrictionTracker.class, signature.newTracker().trackerT().getRawType());
+ assertEquals(SomeRestriction.class, signature.newTracker().restrictionT().getRawType());
+ assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType());
+ }
+
+ /**
+ * Tests a splittable {@link DoFn} that defines all methods in their full form, correctly, using
+ * generic types.
+ */
+ @Test
+ public void testSplittableWithAllFunctionsGeneric() throws Exception {
+ class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
+ @ProcessElement
+ public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public RestrictionT getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @SplitRestriction
+ public void splitRestriction(
+ Integer element, RestrictionT restriction, OutputReceiver<RestrictionT> receiver) {}
+
+ @NewTracker
+ public TrackerT newTracker(RestrictionT restriction) {
+ return null;
+ }
+
+ @GetRestrictionCoder
+ public CoderT getRestrictionCoder() {
+ return null;
+ }
+ }
+
+ DoFnSignature signature =
+ DoFnSignatures.INSTANCE.getOrParseSignature(
+ new GoodGenericSplittableDoFn<
+ SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
+ assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
+ assertTrue(signature.processElement().isSplittable());
+ assertTrue(signature.processElement().hasReturnValue());
+ assertEquals(
+ SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
+ assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
+ assertEquals(SomeRestrictionTracker.class, signature.newTracker().trackerT().getRawType());
+ assertEquals(SomeRestriction.class, signature.newTracker().restrictionT().getRawType());
+ assertEquals(SomeRestrictionCoder.class, signature.getRestrictionCoder().coderT().getRawType());
+ }
+
+ @Test
+ public void testSplittableMissingRequiredMethods() throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+ }
+
+ thrown.expectMessage(
+ "Splittable, but does not define the following required methods: "
+ + "[@GetInitialRestriction, @NewTracker]");
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+ }
+
+ @Test
+ public void testNewTrackerReturnsWrongType() throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+ @NewTracker
+ public void newTracker(SomeRestriction restriction) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "Returns void, but must return a subtype of RestrictionTracker<SomeRestriction>");
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+ }
+
+ @Test
+ public void testGetInitialRestrictionMismatchesNewTracker() throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public String getInitialRestriction(Integer element) {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "getInitialRestriction(Integer): Uses restriction type String, but @NewTracker method");
+ thrown.expectMessage("newTracker(SomeRestriction) uses restriction type SomeRestriction");
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+ }
+
+ @Test
+ public void testGetRestrictionCoderReturnsWrongType() throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @GetRestrictionCoder
+ public KvCoder getRestrictionCoder() {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "getRestrictionCoder() returns KvCoder which is not a subtype of Coder<SomeRestriction>");
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+ }
+
+ @Test
+ public void testSplitRestrictionReturnsWrongType() throws Exception {
+ thrown.expectMessage(
+ "Third argument must be OutputReceiver<SomeRestriction>, but is OutputReceiver<String>");
+ DoFnSignatures.analyzeSplitRestrictionMethod(
+ errors(),
+ TypeToken.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ void method(
+ Integer element, SomeRestriction restriction, DoFn.OutputReceiver<String> receiver) {}
+ }.getMethod(),
+ TypeToken.of(Integer.class));
+ }
+
+ @Test
+ public void testSplitRestrictionWrongElementArgument() throws Exception {
+ class BadFn {
+ private List<SomeRestriction> splitRestriction(String element, SomeRestriction restriction) {
+ return null;
+ }
+ }
+
+ thrown.expectMessage("First argument must be the element type Integer");
+ DoFnSignatures.analyzeSplitRestrictionMethod(
+ errors(),
+ TypeToken.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ void method(
+ String element,
+ SomeRestriction restriction,
+ DoFn.OutputReceiver<SomeRestriction> receiver) {}
+ }.getMethod(),
+ TypeToken.of(Integer.class));
+ }
+
+ @Test
+ public void testSplitRestrictionWrongNumArguments() throws Exception {
+ thrown.expectMessage("Must have exactly 3 arguments");
+ DoFnSignatures.analyzeSplitRestrictionMethod(
+ errors(),
+ TypeToken.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ private void method(
+ Integer element,
+ SomeRestriction restriction,
+ DoFn.OutputReceiver<SomeRestriction> receiver,
+ Object extra) {}
+ }.getMethod(),
+ TypeToken.of(Integer.class));
+ }
+
+ @Test
+ public void testSplitRestrictionConsistentButWrongType() throws Exception {
+ class OtherRestriction {}
+
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @DoFn.SplitRestriction
+ public void splitRestriction(
+ Integer element,
+ OtherRestriction restriction,
+ OutputReceiver<OtherRestriction> receiver) {}
+ }
+
+ thrown.expectMessage(
+ "getInitialRestriction(Integer): Uses restriction type SomeRestriction, "
+ + "but @SplitRestriction method ");
+ thrown.expectMessage(
+ "splitRestriction(Integer, OtherRestriction, OutputReceiver) "
+ + "uses restriction type OtherRestriction");
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+ }
+
+ @Test
+ public void testUnsplittableMustNotDefineExtraMethods() throws Exception {
+ class BadFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void processElement(ProcessContext context) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @SplitRestriction
+ public void splitRestriction(
+ Integer element, SomeRestriction restriction, OutputReceiver<SomeRestriction> receiver) {}
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+
+ @GetRestrictionCoder
+ public SomeRestrictionCoder getRestrictionCoder() {
+ return null;
+ }
+ }
+
+ thrown.expectMessage(
+ "Non-splittable, but defines methods: "
+ + "[@GetInitialRestriction, @SplitRestriction, @NewTracker, @GetRestrictionCoder]");
+ DoFnSignatures.INSTANCE.getOrParseSignature(BadFn.class);
+ }
+
+ @Test
+ public void testNewTrackerWrongNumArguments() throws Exception {
+ thrown.expectMessage("Must have a single argument");
+ DoFnSignatures.analyzeNewTrackerMethod(
+ errors(),
+ TypeToken.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ private SomeRestrictionTracker method(SomeRestriction restriction, Object extra) {
+ return null;
+ }
+ }.getMethod());
+ }
+
+ @Test
+ public void testNewTrackerInconsistent() throws Exception {
+ thrown.expectMessage(
+ "Returns SomeRestrictionTracker, but must return a subtype of RestrictionTracker<String>");
+ DoFnSignatures.analyzeNewTrackerMethod(
+ errors(),
+ TypeToken.of(FakeDoFn.class),
+ new AnonymousMethod() {
+ private SomeRestrictionTracker method(String restriction) {
+ return null;
+ }
+ }.getMethod());
+ }
+}
[2/4] incubator-beam git commit: [BEAM-65] SplittableDoFn prototype.
Posted by bc...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
index 3eee74a..f671a67 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnAdapters.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -37,8 +38,8 @@ import org.joda.time.Instant;
/**
* Utility class containing adapters to/from {@link DoFn} and {@link OldDoFn}.
*
- * @deprecated This class will go away when we start running {@link DoFn}'s directly (using
- * {@link DoFnInvoker}) rather than via {@link OldDoFn}.
+ * @deprecated This class will go away when we start running {@link DoFn}'s directly (using {@link
+ * DoFnInvoker}) rather than via {@link OldDoFn}.
*/
@Deprecated
public class DoFnAdapters {
@@ -176,6 +177,18 @@ public class DoFnAdapters {
}
/**
+ * If the fn was created using {@link #toOldDoFn}, returns the original {@link DoFn}. Otherwise,
+ * returns {@code null}.
+ */
+ public static <InputT, OutputT> DoFn<InputT, OutputT> getDoFn(OldDoFn<InputT, OutputT> fn) {
+ if (fn instanceof SimpleDoFnAdapter) {
+ return ((SimpleDoFnAdapter<InputT, OutputT>) fn).fn;
+ } else {
+ return null;
+ }
+ }
+
+ /**
* Wraps a {@link DoFn} that doesn't require access to {@link BoundedWindow} as an {@link
* OldDoFn}.
*/
@@ -324,6 +337,11 @@ public class DoFnAdapters {
public DoFn.OutputReceiver<OutputT> outputReceiver() {
throw new UnsupportedOperationException("outputReceiver() exists only for testing");
}
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ throw new UnsupportedOperationException("This is a non-splittable DoFn");
+ }
}
/**
@@ -412,5 +430,10 @@ public class DoFnAdapters {
public DoFn.OutputReceiver<OutputT> outputReceiver() {
throw new UnsupportedOperationException("outputReceiver() exists only for testing");
}
+
+ @Override
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ throw new UnsupportedOperationException("This is a non-splittable DoFn");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 11a4cbd..302bb02 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -46,7 +46,9 @@ import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.InMemoryTimerInternals;
import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.TimerCallback;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -222,8 +224,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
if (state == State.UNINITIALIZED) {
initializeState();
}
- TestContext<InputT, OutputT> context = createContext(fn);
+ TestContext context = createContext(fn);
context.setupDelegateAggregators();
+ // State and timer internals are per-bundle.
+ stateInternals = InMemoryStateInternals.forKey(new Object());
+ timerInternals = new InMemoryTimerInternals();
try {
fn.startBundle(context);
} catch (UserCodeException e) {
@@ -460,6 +465,35 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return extractAggregatorValue(agg.getName(), agg.getCombineFn());
}
+ private static TimerCallback collectInto(final List<TimerInternals.TimerData> firedTimers) {
+ return new TimerCallback() {
+ @Override
+ public void onTimer(TimerInternals.TimerData timer) throws Exception {
+ firedTimers.add(timer);
+ }
+ };
+ }
+
+ public List<TimerInternals.TimerData> advanceInputWatermark(Instant newWatermark) {
+ try {
+ final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
+ timerInternals.advanceInputWatermark(collectInto(firedTimers), newWatermark);
+ return firedTimers;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<TimerInternals.TimerData> advanceProcessingTime(Instant newProcessingTime) {
+ try {
+ final List<TimerInternals.TimerData> firedTimers = new ArrayList<>();
+ timerInternals.advanceProcessingTime(collectInto(firedTimers), newProcessingTime);
+ return firedTimers;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private <AccumT, AggregateT> AggregateT extractAggregatorValue(
String name, CombineFn<?, AccumT, AggregateT> combiner) {
@SuppressWarnings("unchecked")
@@ -476,41 +510,27 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
return MoreObjects.firstNonNull(elems, Collections.<WindowedValue<T>>emptyList());
}
- private TestContext<InputT, OutputT> createContext(OldDoFn<InputT, OutputT> fn) {
- return new TestContext<>(fn, options, mainOutputTag, outputs, accumulators);
+ private TestContext createContext(OldDoFn<InputT, OutputT> fn) {
+ return new TestContext();
}
- private static class TestContext<InT, OutT> extends OldDoFn<InT, OutT>.Context {
- private final PipelineOptions opts;
- private final TupleTag<OutT> mainOutputTag;
- private final Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
- private final Map<String, Object> accumulators;
-
- public TestContext(
- OldDoFn<InT, OutT> fn,
- PipelineOptions opts,
- TupleTag<OutT> mainOutputTag,
- Map<TupleTag<?>, List<WindowedValue<?>>> outputs,
- Map<String, Object> accumulators) {
+ private class TestContext extends OldDoFn<InputT, OutputT>.Context {
+ TestContext() {
fn.super();
- this.opts = opts;
- this.mainOutputTag = mainOutputTag;
- this.outputs = outputs;
- this.accumulators = accumulators;
}
@Override
public PipelineOptions getPipelineOptions() {
- return opts;
+ return options;
}
@Override
- public void output(OutT output) {
+ public void output(OutputT output) {
sideOutput(mainOutputTag, output);
}
@Override
- public void outputWithTimestamp(OutT output, Instant timestamp) {
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
sideOutputWithTimestamp(mainOutputTag, output, timestamp);
}
@@ -570,40 +590,27 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
}
- private TestProcessContext<InputT, OutputT> createProcessContext(
+ private TestProcessContext createProcessContext(
OldDoFn<InputT, OutputT> fn,
TimestampedValue<InputT> elem) {
WindowedValue<InputT> windowedValue = WindowedValue.timestampedValueInGlobalWindow(
elem.getValue(), elem.getTimestamp());
- return new TestProcessContext<>(fn,
- createContext(fn),
- windowedValue,
- mainOutputTag,
- sideInputs);
- }
-
- private static class TestProcessContext<InT, OutT> extends OldDoFn<InT, OutT>.ProcessContext {
- private final TestContext<InT, OutT> context;
- private final TupleTag<OutT> mainOutputTag;
- private final WindowedValue<InT> element;
- private final Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs;
-
- private TestProcessContext(
- OldDoFn<InT, OutT> fn,
- TestContext<InT, OutT> context,
- WindowedValue<InT> element,
- TupleTag<OutT> mainOutputTag,
- Map<PCollectionView<?>, Map<BoundedWindow, ?>> sideInputs) {
+ return new TestProcessContext(windowedValue);
+ }
+
+ private class TestProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext {
+ private final TestContext context;
+ private final WindowedValue<InputT> element;
+
+ private TestProcessContext(WindowedValue<InputT> element) {
fn.super();
- this.context = context;
+ this.context = createContext(fn);
this.element = element;
- this.mainOutputTag = mainOutputTag;
- this.sideInputs = sideInputs;
}
@Override
- public InT element() {
+ public InputT element() {
return element.getValue();
}
@@ -638,10 +645,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- public WindowingInternals<InT, OutT> windowingInternals() {
- return new WindowingInternals<InT, OutT>() {
- StateInternals<?> stateInternals = InMemoryStateInternals.forKey(new Object());
-
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ return new WindowingInternals<InputT, OutputT>() {
@Override
public StateInternals<?> stateInternals() {
return stateInternals;
@@ -649,7 +654,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public void outputWindowedValue(
- OutT output,
+ OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
@@ -658,8 +663,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
@Override
public TimerInternals timerInternals() {
- throw
- new UnsupportedOperationException("Timer Internals are not supported in DoFnTester");
+ return timerInternals;
}
@Override
@@ -695,12 +699,12 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
}
@Override
- public void output(OutT output) {
+ public void output(OutputT output) {
sideOutput(mainOutputTag, output);
}
@Override
- public void outputWithTimestamp(OutT output, Instant timestamp) {
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
sideOutputWithTimestamp(mainOutputTag, output, timestamp);
}
@@ -774,6 +778,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable {
/** The outputs from the {@link DoFn} under test. */
private Map<TupleTag<?>, List<WindowedValue<?>>> outputs;
+ private InMemoryStateInternals<?> stateInternals;
+ private InMemoryTimerInternals timerInternals;
+
/** The state of processing of the {@link DoFn} under test. */
private State state = State.UNINITIALIZED;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 2443d8e..fdef908 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.Arrays;
@@ -27,6 +29,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.StringUtils;
@@ -716,6 +719,8 @@ public class ParDo {
@Override
public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
+ checkArgument(
+ !isSplittable(fn), "Splittable DoFn not supported by the current runner");
return PCollection.<OutputT>createPrimitiveOutputInternal(
input.getPipeline(),
input.getWindowingStrategy(),
@@ -925,6 +930,9 @@ public class ParDo {
@Override
public PCollectionTuple apply(PCollection<? extends InputT> input) {
+ checkArgument(
+ !isSplittable(fn), "Splittable DoFn not supported by the current runner");
+
PCollectionTuple outputs = PCollectionTuple.ofPrimitiveOutputsInternal(
input.getPipeline(),
TupleTagList.of(mainOutputTag).and(sideOutputTags.getAll()),
@@ -997,4 +1005,15 @@ public class ParDo {
.add(DisplayData.item("fn", fnClass)
.withLabel("Transform Function"));
}
+
+ private static boolean isSplittable(OldDoFn<?, ?> oldDoFn) {
+ DoFn<?, ?> fn = DoFnAdapters.getDoFn(oldDoFn);
+ if (fn == null) {
+ return false;
+ }
+ return DoFnSignatures.INSTANCE
+ .getOrParseSignature(fn.getClass())
+ .processElement()
+ .isSplittable();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index eb6961c..9672d53 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.transforms.reflect;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
/**
* Interface for invoking the {@code DoFn} processing methods.
@@ -43,7 +46,28 @@ public interface DoFnInvoker<InputT, OutputT> {
*
* @param c The {@link DoFn.ProcessContext} to invoke the fn with.
* @param extra Factory for producing extra parameter objects (such as window), if necessary.
+ * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
+ * DoFn.ProcessContinuation#stop()} if it returns {@code void}.
*/
- void invokeProcessElement(
+ DoFn.ProcessContinuation invokeProcessElement(
DoFn<InputT, OutputT>.ProcessContext c, DoFn.ExtraContextFactory<InputT, OutputT> extra);
+
+ /** Invoke the {@link DoFn.GetInitialRestriction} method on the bound {@link DoFn}. */
+ <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element);
+
+ /**
+ * Invoke the {@link DoFn.GetRestrictionCoder} method on the bound {@link DoFn}. Called only
+ * during pipeline construction time.
+ */
+ <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry coderRegistry);
+
+ /** Invoke the {@link DoFn.SplitRestriction} method on the bound {@link DoFn}. */
+ <RestrictionT> void invokeSplitRestriction(
+ InputT element,
+ RestrictionT restriction,
+ DoFn.OutputReceiver<RestrictionT> restrictionReceiver);
+
+ /** Invoke the {@link DoFn.NewTracker} method on the bound {@link DoFn}. */
+ <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> TrackerT invokeNewTracker(
+ RestrictionT restriction);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
index da88587..fd057c3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect;
import static com.google.common.base.Preconditions.checkArgument;
+import com.google.common.reflect.TypeToken;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -26,6 +27,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumMap;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import net.bytebuddy.ByteBuddy;
@@ -35,10 +37,12 @@ import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.modifier.FieldManifestation;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.description.type.TypeList;
import net.bytebuddy.dynamic.DynamicType;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.dynamic.scaffold.InstrumentedType;
import net.bytebuddy.dynamic.scaffold.subclass.ConstructorStrategy;
+import net.bytebuddy.implementation.ExceptionMethod;
import net.bytebuddy.implementation.FixedValue;
import net.bytebuddy.implementation.Implementation;
import net.bytebuddy.implementation.Implementation.Context;
@@ -48,6 +52,7 @@ import net.bytebuddy.implementation.bytecode.ByteCodeAppender;
import net.bytebuddy.implementation.bytecode.StackManipulation;
import net.bytebuddy.implementation.bytecode.Throw;
import net.bytebuddy.implementation.bytecode.assign.Assigner;
+import net.bytebuddy.implementation.bytecode.assign.TypeCasting;
import net.bytebuddy.implementation.bytecode.member.FieldAccess;
import net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import net.bytebuddy.implementation.bytecode.member.MethodReturn;
@@ -57,12 +62,17 @@ import net.bytebuddy.jar.asm.MethodVisitor;
import net.bytebuddy.jar.asm.Opcodes;
import net.bytebuddy.jar.asm.Type;
import net.bytebuddy.matcher.ElementMatchers;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ExtraContextFactory;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.DoFnAdapters;
import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.TypeDescriptor;
/** Dynamically generates {@link DoFnInvoker} instances for invoking a {@link DoFn}. */
public class DoFnInvokers {
@@ -81,10 +91,10 @@ public class DoFnInvokers {
private DoFnInvokers() {}
/**
- * Creates a {@link DoFnInvoker} for the given {@link Object}, which should be either a
- * {@link DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's
- * function as an {@link Object} and then pass it to this method, so there is no need to
- * statically specify what sort of object it is.
+ * Creates a {@link DoFnInvoker} for the given {@link Object}, which should be either a {@link
+ * DoFn} or an {@link OldDoFn}. The expected use would be to deserialize a user's function as an
+ * {@link Object} and then pass it to this method, so there is no need to statically specify what
+ * sort of object it is.
*
* @deprecated this is to be used only as a migration path for decoupling upgrades
*/
@@ -92,15 +102,16 @@ public class DoFnInvokers {
public DoFnInvoker<?, ?> invokerFor(Object deserializedFn) {
if (deserializedFn instanceof DoFn) {
return newByteBuddyInvoker((DoFn<?, ?>) deserializedFn);
- } else if (deserializedFn instanceof OldDoFn){
+ } else if (deserializedFn instanceof OldDoFn) {
return new OldDoFnInvoker<>((OldDoFn<?, ?>) deserializedFn);
} else {
- throw new IllegalArgumentException(String.format(
- "Cannot create a %s for %s; it should be either a %s or an %s.",
- DoFnInvoker.class.getSimpleName(),
- deserializedFn.toString(),
- DoFn.class.getSimpleName(),
- OldDoFn.class.getSimpleName()));
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot create a %s for %s; it should be either a %s or an %s.",
+ DoFnInvoker.class.getSimpleName(),
+ deserializedFn.toString(),
+ DoFn.class.getSimpleName(),
+ OldDoFn.class.getSimpleName()));
}
}
@@ -113,12 +124,13 @@ public class DoFnInvokers {
}
@Override
- public void invokeProcessElement(
+ public DoFn.ProcessContinuation invokeProcessElement(
DoFn<InputT, OutputT>.ProcessContext c, ExtraContextFactory<InputT, OutputT> extra) {
OldDoFn<InputT, OutputT>.ProcessContext oldCtx =
DoFnAdapters.adaptProcessContext(fn, c, extra);
try {
fn.processElement(oldCtx);
+ return DoFn.ProcessContinuation.stop();
} catch (Throwable exc) {
throw UserCodeException.wrap(exc);
}
@@ -161,14 +173,37 @@ public class DoFnInvokers {
throw UserCodeException.wrap(exc);
}
}
+
+ @Override
+ public <RestrictionT> RestrictionT invokeGetInitialRestriction(InputT element) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
+
+ @Override
+ public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(
+ CoderRegistry coderRegistry) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
+
+ @Override
+ public <RestrictionT> void invokeSplitRestriction(
+ InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
+
+ @Override
+ public <RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ TrackerT invokeNewTracker(RestrictionT restriction) {
+ throw new UnsupportedOperationException("OldDoFn is not splittable");
+ }
}
/** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
@SuppressWarnings({"unchecked", "rawtypes"})
public <InputT, OutputT> DoFnInvoker<InputT, OutputT> newByteBuddyInvoker(
DoFn<InputT, OutputT> fn) {
- return newByteBuddyInvoker(DoFnSignatures.INSTANCE.getOrParseSignature(
- (Class) fn.getClass()), fn);
+ return newByteBuddyInvoker(
+ DoFnSignatures.INSTANCE.getOrParseSignature((Class) fn.getClass()), fn);
}
/** @return the {@link DoFnInvoker} for the given {@link DoFn}. */
@@ -214,6 +249,32 @@ public class DoFnInvokers {
return constructor;
}
+ /** Default implementation of {@link DoFn.SplitRestriction}, for delegation by bytebuddy. */
+ public static class DefaultSplitRestriction {
+ /** Doesn't split the restriction. */
+ @SuppressWarnings("unused")
+ public static <InputT, RestrictionT> void invokeSplitRestriction(
+ InputT element, RestrictionT restriction, DoFn.OutputReceiver<RestrictionT> receiver) {
+ receiver.output(restriction);
+ }
+ }
+
+ /** Default implementation of {@link DoFn.GetRestrictionCoder}, for delegation by bytebuddy. */
+ public static class DefaultRestrictionCoder {
+ private final TypeToken<?> restrictionType;
+
+ DefaultRestrictionCoder(TypeToken<?> restrictionType) {
+ this.restrictionType = restrictionType;
+ }
+
+ /** Doesn't split the restriction. */
+ @SuppressWarnings({"unused", "unchecked"})
+ public <RestrictionT> Coder<RestrictionT> invokeGetRestrictionCoder(CoderRegistry registry)
+ throws CannotProvideCoderException {
+ return (Coder) registry.getCoder(TypeDescriptor.of(restrictionType.getType()));
+ }
+ }
+
/** Generates a {@link DoFnInvoker} class for the given {@link DoFnSignature}. */
private static Class<? extends DoFnInvoker<?, ?>> generateInvokerClass(DoFnSignature signature) {
Class<? extends DoFn<?, ?>> fnClass = signature.fnClass();
@@ -247,7 +308,15 @@ public class DoFnInvokers {
.method(ElementMatchers.named("invokeSetup"))
.intercept(delegateOrNoop(signature.setup()))
.method(ElementMatchers.named("invokeTeardown"))
- .intercept(delegateOrNoop(signature.teardown()));
+ .intercept(delegateOrNoop(signature.teardown()))
+ .method(ElementMatchers.named("invokeGetInitialRestriction"))
+ .intercept(delegateWithDowncastOrThrow(signature.getInitialRestriction()))
+ .method(ElementMatchers.named("invokeSplitRestriction"))
+ .intercept(splitRestrictionDelegation(signature))
+ .method(ElementMatchers.named("invokeGetRestrictionCoder"))
+ .intercept(getRestrictionCoderDelegation(signature))
+ .method(ElementMatchers.named("invokeNewTracker"))
+ .intercept(delegateWithDowncastOrThrow(signature.newTracker()));
DynamicType.Unloaded<?> unloaded = builder.make();
@@ -260,6 +329,28 @@ public class DoFnInvokers {
return res;
}
+ private static Implementation getRestrictionCoderDelegation(DoFnSignature signature) {
+ if (signature.processElement().isSplittable()) {
+ if (signature.getRestrictionCoder() == null) {
+ return MethodDelegation.to(
+ new DefaultRestrictionCoder(signature.getInitialRestriction().restrictionT()));
+ } else {
+ return new DowncastingParametersMethodDelegation(
+ signature.getRestrictionCoder().targetMethod());
+ }
+ } else {
+ return ExceptionMethod.throwing(UnsupportedOperationException.class);
+ }
+ }
+
+ private static Implementation splitRestrictionDelegation(DoFnSignature signature) {
+ if (signature.splitRestriction() == null) {
+ return MethodDelegation.to(DefaultSplitRestriction.class);
+ } else {
+ return new DowncastingParametersMethodDelegation(signature.splitRestriction().targetMethod());
+ }
+ }
+
/** Delegates to the given method if available, or does nothing. */
private static Implementation delegateOrNoop(DoFnSignature.DoFnMethod method) {
return (method == null)
@@ -267,6 +358,13 @@ public class DoFnInvokers {
: new DoFnMethodDelegation(method.targetMethod());
}
+ /** Delegates to the given method if available, or throws UnsupportedOperationException. */
+ private static Implementation delegateWithDowncastOrThrow(DoFnSignature.DoFnMethod method) {
+ return (method == null)
+ ? ExceptionMethod.throwing(UnsupportedOperationException.class)
+ : new DowncastingParametersMethodDelegation(method.targetMethod());
+ }
+
/**
* Implements a method of {@link DoFnInvoker} (the "instrumented method") by delegating to a
* "target method" of the wrapped {@link DoFn}.
@@ -374,12 +472,37 @@ public class DoFnInvokers {
}
/**
+ * Passes parameters to the delegated method by downcasting each parameter of non-primitive type
+ * to its expected type.
+ */
+ private static class DowncastingParametersMethodDelegation extends DoFnMethodDelegation {
+ DowncastingParametersMethodDelegation(Method method) {
+ super(method);
+ }
+
+ @Override
+ protected StackManipulation beforeDelegation(MethodDescription instrumentedMethod) {
+ List<StackManipulation> pushParameters = new ArrayList<>();
+ TypeList.Generic paramTypes = targetMethod.getParameters().asTypeList();
+ for (int i = 0; i < paramTypes.size(); i++) {
+ TypeDescription.Generic paramT = paramTypes.get(i);
+ pushParameters.add(MethodVariableAccess.of(paramT).loadOffset(i + 1));
+ if (!paramT.isPrimitive()) {
+ pushParameters.add(TypeCasting.to(paramT));
+ }
+ }
+ return new StackManipulation.Compound(pushParameters);
+ }
+ }
+
+ /**
* Implements the invoker's {@link DoFnInvoker#invokeProcessElement} method by delegating to the
* {@link DoFn.ProcessElement} method.
*/
private static final class ProcessElementDelegation extends DoFnMethodDelegation {
private static final Map<DoFnSignature.Parameter, MethodDescription>
EXTRA_CONTEXT_FACTORY_METHODS;
+ private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
static {
try {
@@ -397,11 +520,21 @@ public class DoFnInvokers {
DoFnSignature.Parameter.OUTPUT_RECEIVER,
new MethodDescription.ForLoadedMethod(
DoFn.ExtraContextFactory.class.getMethod("outputReceiver")));
+ methods.put(
+ DoFnSignature.Parameter.RESTRICTION_TRACKER,
+ new MethodDescription.ForLoadedMethod(
+ DoFn.ExtraContextFactory.class.getMethod("restrictionTracker")));
EXTRA_CONTEXT_FACTORY_METHODS = Collections.unmodifiableMap(methods);
} catch (Exception e) {
throw new RuntimeException(
"Failed to locate an ExtraContextFactory method that was expected to exist", e);
}
+ try {
+ PROCESS_CONTINUATION_STOP_METHOD =
+ new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
+ }
}
private final DoFnSignature.ProcessElementMethod signature;
@@ -427,14 +560,26 @@ public class DoFnInvokers {
parameters.add(
new StackManipulation.Compound(
pushExtraContextFactory,
- MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param))));
+ MethodInvocation.invoke(EXTRA_CONTEXT_FACTORY_METHODS.get(param)),
+ // ExtraContextFactory.restrictionTracker() returns a RestrictionTracker,
+ // but the @ProcessElement method expects a concrete subtype of it.
+ // Insert a downcast.
+ (param == DoFnSignature.Parameter.RESTRICTION_TRACKER)
+ ? TypeCasting.to(
+ new TypeDescription.ForLoadedType(signature.trackerT().getRawType()))
+ : StackManipulation.Trivial.INSTANCE));
}
return new StackManipulation.Compound(parameters);
}
@Override
protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
- return MethodReturn.VOID;
+ if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
+ return new StackManipulation.Compound(
+ MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
+ } else {
+ return MethodReturn.returning(targetMethod.getReturnType().asErasure());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index 756df07..632f817 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -18,11 +18,16 @@
package org.apache.beam.sdk.transforms.reflect;
import com.google.auto.value.AutoValue;
+import com.google.common.reflect.TypeToken;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.PCollection;
/**
* Describes the signature of a {@link DoFn}, in particular, which features it uses, which extra
@@ -35,6 +40,9 @@ public abstract class DoFnSignature {
/** Class of the original {@link DoFn} from which this signature was produced. */
public abstract Class<? extends DoFn<?, ?>> fnClass();
+ /** Whether this {@link DoFn} does a bounded amount of work per element. */
+ public abstract PCollection.IsBounded isBoundedPerElement();
+
/** Details about this {@link DoFn}'s {@link DoFn.ProcessElement} method. */
public abstract ProcessElementMethod processElement();
@@ -54,6 +62,22 @@ public abstract class DoFnSignature {
@Nullable
public abstract LifecycleMethod teardown();
+ /** Details about this {@link DoFn}'s {@link DoFn.GetInitialRestriction} method. */
+ @Nullable
+ public abstract GetInitialRestrictionMethod getInitialRestriction();
+
+ /** Details about this {@link DoFn}'s {@link DoFn.SplitRestriction} method. */
+ @Nullable
+ public abstract SplitRestrictionMethod splitRestriction();
+
+ /** Details about this {@link DoFn}'s {@link DoFn.GetRestrictionCoder} method. */
+ @Nullable
+ public abstract GetRestrictionCoderMethod getRestrictionCoder();
+
+ /** Details about this {@link DoFn}'s {@link DoFn.NewTracker} method. */
+ @Nullable
+ public abstract NewTrackerMethod newTracker();
+
static Builder builder() {
return new AutoValue_DoFnSignature.Builder();
}
@@ -61,11 +85,16 @@ public abstract class DoFnSignature {
@AutoValue.Builder
abstract static class Builder {
abstract Builder setFnClass(Class<? extends DoFn<?, ?>> fnClass);
+ abstract Builder setIsBoundedPerElement(PCollection.IsBounded isBounded);
abstract Builder setProcessElement(ProcessElementMethod processElement);
abstract Builder setStartBundle(BundleMethod startBundle);
abstract Builder setFinishBundle(BundleMethod finishBundle);
abstract Builder setSetup(LifecycleMethod setup);
abstract Builder setTeardown(LifecycleMethod teardown);
+ abstract Builder setGetInitialRestriction(GetInitialRestrictionMethod getInitialRestriction);
+ abstract Builder setSplitRestriction(SplitRestrictionMethod splitRestriction);
+ abstract Builder setGetRestrictionCoder(GetRestrictionCoderMethod getRestrictionCoder);
+ abstract Builder setNewTracker(NewTrackerMethod newTracker);
abstract DoFnSignature build();
}
@@ -80,6 +109,7 @@ public abstract class DoFnSignature {
BOUNDED_WINDOW,
INPUT_PROVIDER,
OUTPUT_RECEIVER,
+ RESTRICTION_TRACKER
}
/** Describes a {@link DoFn.ProcessElement} method. */
@@ -92,17 +122,33 @@ public abstract class DoFnSignature {
/** Types of optional parameters of the annotated method, in the order they appear. */
public abstract List<Parameter> extraParameters();
+ /** Concrete type of the {@link RestrictionTracker} parameter, if present. */
+ @Nullable
+ abstract TypeToken<?> trackerT();
+
+ /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
+ public abstract boolean hasReturnValue();
+
static ProcessElementMethod create(
Method targetMethod,
- List<Parameter> extraParameters) {
+ List<Parameter> extraParameters,
+ TypeToken<?> trackerT,
+ boolean hasReturnValue) {
return new AutoValue_DoFnSignature_ProcessElementMethod(
- targetMethod, Collections.unmodifiableList(extraParameters));
+ targetMethod, Collections.unmodifiableList(extraParameters), trackerT, hasReturnValue);
}
/** Whether this {@link DoFn} uses a Single Window. */
public boolean usesSingleWindow() {
return extraParameters().contains(Parameter.BOUNDED_WINDOW);
}
+
+ /**
+ * Whether this {@link DoFn} is <a href="https://s.apache.org/splittable-do-fn">splittable</a>.
+ */
+ public boolean isSplittable() {
+ return extraParameters().contains(Parameter.RESTRICTION_TRACKER);
+ }
}
/** Describes a {@link DoFn.StartBundle} or {@link DoFn.FinishBundle} method. */
@@ -128,4 +174,68 @@ public abstract class DoFnSignature {
return new AutoValue_DoFnSignature_LifecycleMethod(targetMethod);
}
}
+
+ /** Describes a {@link DoFn.GetInitialRestriction} method. */
+ @AutoValue
+ public abstract static class GetInitialRestrictionMethod implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /** Type of the returned restriction. */
+ abstract TypeToken<?> restrictionT();
+
+ static GetInitialRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+ return new AutoValue_DoFnSignature_GetInitialRestrictionMethod(targetMethod, restrictionT);
+ }
+ }
+
+ /** Describes a {@link DoFn.SplitRestriction} method. */
+ @AutoValue
+ public abstract static class SplitRestrictionMethod implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /** Type of the restriction taken and returned. */
+ abstract TypeToken<?> restrictionT();
+
+ static SplitRestrictionMethod create(Method targetMethod, TypeToken<?> restrictionT) {
+ return new AutoValue_DoFnSignature_SplitRestrictionMethod(targetMethod, restrictionT);
+ }
+ }
+
+ /** Describes a {@link DoFn.NewTracker} method. */
+ @AutoValue
+ public abstract static class NewTrackerMethod implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /** Type of the input restriction. */
+ abstract TypeToken<?> restrictionT();
+
+ /** Type of the returned {@link RestrictionTracker}. */
+ abstract TypeToken<?> trackerT();
+
+ static NewTrackerMethod create(
+ Method targetMethod, TypeToken<?> restrictionT, TypeToken<?> trackerT) {
+ return new AutoValue_DoFnSignature_NewTrackerMethod(targetMethod, restrictionT, trackerT);
+ }
+ }
+
+ /** Describes a {@link DoFn.GetRestrictionCoder} method. */
+ @AutoValue
+ public abstract static class GetRestrictionCoderMethod implements DoFnMethod {
+ /** The annotated method itself. */
+ @Override
+ public abstract Method targetMethod();
+
+ /** Type of the returned {@link Coder}. */
+ abstract TypeToken<?> coderT();
+
+ static GetRestrictionCoderMethod create(Method targetMethod, TypeToken<?> coderT) {
+ return new AutoValue_DoFnSignature_GetRestrictionCoderMethod(targetMethod, coderT);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index ad15127..524ea24 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.transforms.reflect;
+import static com.google.common.base.Preconditions.checkState;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.reflect.TypeParameter;
import com.google.common.reflect.TypeToken;
@@ -34,9 +36,12 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.PCollection;
/**
* Parses a {@link DoFn} and computes its {@link DoFnSignature}. See {@link #getOrParseSignature}.
@@ -88,6 +93,14 @@ public class DoFnSignatures {
Method setupMethod = findAnnotatedMethod(errors, DoFn.Setup.class, fnClass, false);
Method teardownMethod = findAnnotatedMethod(errors, DoFn.Teardown.class, fnClass, false);
+ Method getInitialRestrictionMethod =
+ findAnnotatedMethod(errors, DoFn.GetInitialRestriction.class, fnClass, false);
+ Method splitRestrictionMethod =
+ findAnnotatedMethod(errors, DoFn.SplitRestriction.class, fnClass, false);
+ Method getRestrictionCoderMethod =
+ findAnnotatedMethod(errors, DoFn.GetRestrictionCoder.class, fnClass, false);
+ Method newTrackerMethod = findAnnotatedMethod(errors, DoFn.NewTracker.class, fnClass, false);
+
ErrorReporter processElementErrors =
errors.forMethod(DoFn.ProcessElement.class, processElementMethod);
DoFnSignature.ProcessElementMethod processElement =
@@ -119,7 +132,213 @@ public class DoFnSignatures {
errors.forMethod(DoFn.Teardown.class, teardownMethod), teardownMethod));
}
- return builder.build();
+ DoFnSignature.GetInitialRestrictionMethod getInitialRestriction = null;
+ ErrorReporter getInitialRestrictionErrors = null;
+ if (getInitialRestrictionMethod != null) {
+ getInitialRestrictionErrors =
+ errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestrictionMethod);
+ builder.setGetInitialRestriction(
+ getInitialRestriction =
+ analyzeGetInitialRestrictionMethod(
+ getInitialRestrictionErrors, fnToken, getInitialRestrictionMethod, inputT));
+ }
+
+ DoFnSignature.SplitRestrictionMethod splitRestriction = null;
+ if (splitRestrictionMethod != null) {
+ ErrorReporter splitRestrictionErrors =
+ errors.forMethod(DoFn.SplitRestriction.class, splitRestrictionMethod);
+ builder.setSplitRestriction(
+ splitRestriction =
+ analyzeSplitRestrictionMethod(
+ splitRestrictionErrors, fnToken, splitRestrictionMethod, inputT));
+ }
+
+ DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = null;
+ if (getRestrictionCoderMethod != null) {
+ ErrorReporter getRestrictionCoderErrors =
+ errors.forMethod(DoFn.GetRestrictionCoder.class, getRestrictionCoderMethod);
+ builder.setGetRestrictionCoder(
+ getRestrictionCoder =
+ analyzeGetRestrictionCoderMethod(
+ getRestrictionCoderErrors, fnToken, getRestrictionCoderMethod));
+ }
+
+ DoFnSignature.NewTrackerMethod newTracker = null;
+ if (newTrackerMethod != null) {
+ ErrorReporter newTrackerErrors = errors.forMethod(DoFn.NewTracker.class, newTrackerMethod);
+ builder.setNewTracker(
+ newTracker = analyzeNewTrackerMethod(newTrackerErrors, fnToken, newTrackerMethod));
+ }
+
+ builder.setIsBoundedPerElement(inferBoundedness(fnToken, processElement, errors));
+
+ DoFnSignature signature = builder.build();
+
+ // Additional validation for splittable DoFn's.
+ if (processElement.isSplittable()) {
+ verifySplittableMethods(signature, errors);
+ } else {
+ verifyUnsplittableMethods(errors, signature);
+ }
+
+ return signature;
+ }
+
+ /**
+ * Infers the boundedness of the {@link DoFn.ProcessElement} method (whether or not it performs a
+ * bounded amount of work per element) using the following criteria:
+ *
+ * <ol>
+ * <li>If the {@link DoFn} is not splittable, then it is bounded, it must not be annotated as
+ * {@link DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, and {@link
+ * DoFn.ProcessElement} must return {@code void}.
+ * <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link
+ * DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of
+ * these must be specified.
+ * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is
+ * unbounded. Otherwise (if it returns {@code void}), assume it is bounded.
+ * <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated
+ * {@link DoFn.UnboundedPerElement}, this is an error.
+ * </ol>
+ */
+ private static PCollection.IsBounded inferBoundedness(
+ TypeToken<? extends DoFn> fnToken,
+ DoFnSignature.ProcessElementMethod processElement,
+ ErrorReporter errors) {
+ PCollection.IsBounded isBounded = null;
+ for (TypeToken<?> supertype : fnToken.getTypes()) {
+ if (supertype.getRawType().isAnnotationPresent(DoFn.BoundedPerElement.class)
+ || supertype.getRawType().isAnnotationPresent(DoFn.UnboundedPerElement.class)) {
+ errors.checkArgument(
+ isBounded == null,
+ "Both @%s and @%s specified",
+ DoFn.BoundedPerElement.class.getSimpleName(),
+ DoFn.UnboundedPerElement.class.getSimpleName());
+ isBounded =
+ supertype.getRawType().isAnnotationPresent(DoFn.BoundedPerElement.class)
+ ? PCollection.IsBounded.BOUNDED
+ : PCollection.IsBounded.UNBOUNDED;
+ }
+ }
+ if (processElement.isSplittable()) {
+ if (isBounded == null) {
+ isBounded =
+ processElement.hasReturnValue()
+ ? PCollection.IsBounded.UNBOUNDED
+ : PCollection.IsBounded.BOUNDED;
+ }
+ } else {
+ errors.checkArgument(
+ isBounded == null,
+ "Non-splittable, but annotated as @"
+ + ((isBounded == PCollection.IsBounded.BOUNDED)
+ ? DoFn.BoundedPerElement.class.getSimpleName()
+ : DoFn.UnboundedPerElement.class.getSimpleName()));
+ checkState(!processElement.hasReturnValue(), "Should have been inferred splittable");
+ isBounded = PCollection.IsBounded.BOUNDED;
+ }
+ return isBounded;
+ }
+
+ /**
+ * Verifies properties related to methods of splittable {@link DoFn}:
+ *
+ * <ul>
+ * <li>Must declare the required {@link DoFn.GetInitialRestriction} and {@link DoFn.NewTracker}
+ * methods.
+ * <li>Types of restrictions and trackers must match exactly between {@link DoFn.ProcessElement},
+ * {@link DoFn.GetInitialRestriction}, {@link DoFn.NewTracker}, {@link
+ * DoFn.GetRestrictionCoder}, {@link DoFn.SplitRestriction}.
+ * </ul>
+ */
+ private static void verifySplittableMethods(DoFnSignature signature, ErrorReporter errors) {
+ DoFnSignature.ProcessElementMethod processElement = signature.processElement();
+ DoFnSignature.GetInitialRestrictionMethod getInitialRestriction =
+ signature.getInitialRestriction();
+ DoFnSignature.NewTrackerMethod newTracker = signature.newTracker();
+ DoFnSignature.GetRestrictionCoderMethod getRestrictionCoder = signature.getRestrictionCoder();
+ DoFnSignature.SplitRestrictionMethod splitRestriction = signature.splitRestriction();
+
+ ErrorReporter processElementErrors =
+ errors.forMethod(DoFn.ProcessElement.class, processElement.targetMethod());
+
+ List<String> missingRequiredMethods = new ArrayList<>();
+ if (getInitialRestriction == null) {
+ missingRequiredMethods.add("@" + DoFn.GetInitialRestriction.class.getSimpleName());
+ }
+ if (newTracker == null) {
+ missingRequiredMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+ }
+ if (!missingRequiredMethods.isEmpty()) {
+ processElementErrors.throwIllegalArgument(
+ "Splittable, but does not define the following required methods: %s",
+ missingRequiredMethods);
+ }
+
+ processElementErrors.checkArgument(
+ processElement.trackerT().equals(newTracker.trackerT()),
+ "Has tracker type %s, but @%s method %s uses tracker type %s",
+ formatType(processElement.trackerT()),
+ DoFn.NewTracker.class.getSimpleName(),
+ format(newTracker.targetMethod()),
+ formatType(newTracker.trackerT()));
+
+ ErrorReporter getInitialRestrictionErrors =
+ errors.forMethod(DoFn.GetInitialRestriction.class, getInitialRestriction.targetMethod());
+ TypeToken<?> restrictionT = getInitialRestriction.restrictionT();
+
+ getInitialRestrictionErrors.checkArgument(
+ restrictionT.equals(newTracker.restrictionT()),
+ "Uses restriction type %s, but @%s method %s uses restriction type %s",
+ formatType(restrictionT),
+ DoFn.NewTracker.class.getSimpleName(),
+ format(newTracker.targetMethod()),
+ formatType(newTracker.restrictionT()));
+
+ if (getRestrictionCoder != null) {
+ getInitialRestrictionErrors.checkArgument(
+ getRestrictionCoder.coderT().isSubtypeOf(coderTypeOf(restrictionT)),
+ "Uses restriction type %s, but @%s method %s returns %s "
+ + "which is not a subtype of %s",
+ formatType(restrictionT),
+ DoFn.GetRestrictionCoder.class.getSimpleName(),
+ format(getRestrictionCoder.targetMethod()),
+ formatType(getRestrictionCoder.coderT()),
+ formatType(coderTypeOf(restrictionT)));
+ }
+
+ if (splitRestriction != null) {
+ getInitialRestrictionErrors.checkArgument(
+ splitRestriction.restrictionT().equals(restrictionT),
+ "Uses restriction type %s, but @%s method %s uses restriction type %s",
+ formatType(restrictionT),
+ DoFn.SplitRestriction.class.getSimpleName(),
+ format(splitRestriction.targetMethod()),
+ formatType(splitRestriction.restrictionT()));
+ }
+ }
+
+ /**
+ * Verifies that a non-splittable {@link DoFn} does not declare any methods that only make sense
+ * for splittable {@link DoFn}: {@link DoFn.GetInitialRestriction}, {@link DoFn.SplitRestriction},
+ * {@link DoFn.NewTracker}, {@link DoFn.GetRestrictionCoder}.
+ */
+ private static void verifyUnsplittableMethods(ErrorReporter errors, DoFnSignature signature) {
+ List<String> forbiddenMethods = new ArrayList<>();
+ if (signature.getInitialRestriction() != null) {
+ forbiddenMethods.add("@" + DoFn.GetInitialRestriction.class.getSimpleName());
+ }
+ if (signature.splitRestriction() != null) {
+ forbiddenMethods.add("@" + DoFn.SplitRestriction.class.getSimpleName());
+ }
+ if (signature.newTracker() != null) {
+ forbiddenMethods.add("@" + DoFn.NewTracker.class.getSimpleName());
+ }
+ if (signature.getRestrictionCoder() != null) {
+ forbiddenMethods.add("@" + DoFn.GetRestrictionCoder.class.getSimpleName());
+ }
+ errors.checkArgument(
+ forbiddenMethods.isEmpty(), "Non-splittable, but defines methods: %s", forbiddenMethods);
}
/**
@@ -166,7 +385,11 @@ public class DoFnSignatures {
Method m,
TypeToken<?> inputT,
TypeToken<?> outputT) {
- errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
+ errors.checkArgument(
+ void.class.equals(m.getReturnType())
+ || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
+ "Must return void or %s",
+ DoFn.ProcessContinuation.class.getSimpleName());
TypeToken<?> processContextToken = doFnProcessContextTypeOf(inputT, outputT);
@@ -181,6 +404,7 @@ public class DoFnSignatures {
formatType(processContextToken));
List<DoFnSignature.Parameter> extraParameters = new ArrayList<>();
+ TypeToken<?> trackerT = null;
TypeToken<?> expectedInputProviderT = inputProviderTypeOf(inputT);
TypeToken<?> expectedOutputReceiverT = outputReceiverTypeOf(outputT);
@@ -190,38 +414,62 @@ public class DoFnSignatures {
if (rawType.equals(BoundedWindow.class)) {
errors.checkArgument(
!extraParameters.contains(DoFnSignature.Parameter.BOUNDED_WINDOW),
- "Multiple BoundedWindow parameters");
+ "Multiple %s parameters",
+ BoundedWindow.class.getSimpleName());
extraParameters.add(DoFnSignature.Parameter.BOUNDED_WINDOW);
} else if (rawType.equals(DoFn.InputProvider.class)) {
errors.checkArgument(
!extraParameters.contains(DoFnSignature.Parameter.INPUT_PROVIDER),
- "Multiple InputProvider parameters");
+ "Multiple %s parameters",
+ DoFn.InputProvider.class.getSimpleName());
errors.checkArgument(
paramT.equals(expectedInputProviderT),
- "Wrong type of InputProvider parameter: %s, should be %s",
+ "Wrong type of %s parameter: %s, should be %s",
+ DoFn.InputProvider.class.getSimpleName(),
formatType(paramT),
formatType(expectedInputProviderT));
extraParameters.add(DoFnSignature.Parameter.INPUT_PROVIDER);
} else if (rawType.equals(DoFn.OutputReceiver.class)) {
errors.checkArgument(
!extraParameters.contains(DoFnSignature.Parameter.OUTPUT_RECEIVER),
- "Multiple OutputReceiver parameters");
+ "Multiple %s parameters",
+ DoFn.OutputReceiver.class.getSimpleName());
errors.checkArgument(
paramT.equals(expectedOutputReceiverT),
- "Wrong type of OutputReceiver parameter: %s, should be %s",
+ "Wrong type of %s parameter: %s, should be %s",
+ DoFn.OutputReceiver.class.getSimpleName(),
formatType(paramT),
formatType(expectedOutputReceiverT));
extraParameters.add(DoFnSignature.Parameter.OUTPUT_RECEIVER);
+ } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
+ errors.checkArgument(
+ !extraParameters.contains(DoFnSignature.Parameter.RESTRICTION_TRACKER),
+ "Multiple %s parameters",
+ RestrictionTracker.class.getSimpleName());
+ extraParameters.add(DoFnSignature.Parameter.RESTRICTION_TRACKER);
+ trackerT = paramT;
} else {
List<String> allowedParamTypes =
- Arrays.asList(formatType(new TypeToken<BoundedWindow>() {}));
+ Arrays.asList(
+ formatType(new TypeToken<BoundedWindow>() {}),
+ formatType(new TypeToken<RestrictionTracker<?>>() {}));
errors.throwIllegalArgument(
"%s is not a valid context parameter. Should be one of %s",
formatType(paramT), allowedParamTypes);
}
}
- return DoFnSignature.ProcessElementMethod.create(m, extraParameters);
+ // A splittable DoFn can not have any other extra context parameters.
+ if (extraParameters.contains(DoFnSignature.Parameter.RESTRICTION_TRACKER)) {
+ errors.checkArgument(
+ extraParameters.size() == 1,
+ "Splittable DoFn must not have any extra context arguments apart from %s, but has: %s",
+ trackerT,
+ extraParameters);
+ }
+
+ return DoFnSignature.ProcessElementMethod.create(
+ m, extraParameters, trackerT, DoFn.ProcessContinuation.class.equals(m.getReturnType()));
}
@VisibleForTesting
@@ -248,6 +496,100 @@ public class DoFnSignatures {
return DoFnSignature.LifecycleMethod.create(m);
}
+ @VisibleForTesting
+ static DoFnSignature.GetInitialRestrictionMethod analyzeGetInitialRestrictionMethod(
+ ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+ // Method is of the form:
+ // @GetInitialRestriction
+ // RestrictionT getInitialRestriction(InputT element);
+ Type[] params = m.getGenericParameterTypes();
+ errors.checkArgument(
+ params.length == 1 && fnToken.resolveType(params[0]).equals(inputT),
+ "Must take a single argument of type %s",
+ formatType(inputT));
+ return DoFnSignature.GetInitialRestrictionMethod.create(
+ m, fnToken.resolveType(m.getGenericReturnType()));
+ }
+
+ /** Generates a type token for {@code List<T>} given {@code T}. */
+ private static <T> TypeToken<List<T>> listTypeOf(TypeToken<T> elementT) {
+ return new TypeToken<List<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+ }
+
+ @VisibleForTesting
+ static DoFnSignature.SplitRestrictionMethod analyzeSplitRestrictionMethod(
+ ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m, TypeToken<?> inputT) {
+ // Method is of the form:
+ // @SplitRestriction
+ // void splitRestriction(InputT element, RestrictionT restriction);
+ errors.checkArgument(void.class.equals(m.getReturnType()), "Must return void");
+
+ Type[] params = m.getGenericParameterTypes();
+ errors.checkArgument(params.length == 3, "Must have exactly 3 arguments");
+ errors.checkArgument(
+ fnToken.resolveType(params[0]).equals(inputT),
+ "First argument must be the element type %s",
+ formatType(inputT));
+
+ TypeToken<?> restrictionT = fnToken.resolveType(params[1]);
+ TypeToken<?> receiverT = fnToken.resolveType(params[2]);
+ TypeToken<?> expectedReceiverT = outputReceiverTypeOf(restrictionT);
+ errors.checkArgument(
+ receiverT.equals(expectedReceiverT),
+ "Third argument must be %s, but is %s",
+ formatType(expectedReceiverT),
+ formatType(receiverT));
+
+ return DoFnSignature.SplitRestrictionMethod.create(m, restrictionT);
+ }
+
+ /** Generates a type token for {@code Coder<T>} given {@code T}. */
+ private static <T> TypeToken<Coder<T>> coderTypeOf(TypeToken<T> elementT) {
+ return new TypeToken<Coder<T>>() {}.where(new TypeParameter<T>() {}, elementT);
+ }
+
+ @VisibleForTesting
+ static DoFnSignature.GetRestrictionCoderMethod analyzeGetRestrictionCoderMethod(
+ ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+ errors.checkArgument(m.getParameterTypes().length == 0, "Must have zero arguments");
+ TypeToken<?> resT = fnToken.resolveType(m.getGenericReturnType());
+ errors.checkArgument(
+ resT.isSubtypeOf(TypeToken.of(Coder.class)),
+ "Must return a Coder, but returns %s",
+ formatType(resT));
+ return DoFnSignature.GetRestrictionCoderMethod.create(m, resT);
+ }
+
+ /**
+ * Generates a type token for {@code RestrictionTracker<RestrictionT>} given {@code RestrictionT}.
+ */
+ private static <RestrictionT>
+ TypeToken<RestrictionTracker<RestrictionT>> restrictionTrackerTypeOf(
+ TypeToken<RestrictionT> restrictionT) {
+ return new TypeToken<RestrictionTracker<RestrictionT>>() {}.where(
+ new TypeParameter<RestrictionT>() {}, restrictionT);
+ }
+
+ @VisibleForTesting
+ static DoFnSignature.NewTrackerMethod analyzeNewTrackerMethod(
+ ErrorReporter errors, TypeToken<? extends DoFn> fnToken, Method m) {
+ // Method is of the form:
+ // @NewTracker
+ // TrackerT newTracker(RestrictionT restriction);
+ Type[] params = m.getGenericParameterTypes();
+ errors.checkArgument(params.length == 1, "Must have a single argument");
+
+ TypeToken<?> restrictionT = fnToken.resolveType(params[0]);
+ TypeToken<?> trackerT = fnToken.resolveType(m.getGenericReturnType());
+ TypeToken<?> expectedTrackerT = restrictionTrackerTypeOf(restrictionT);
+ errors.checkArgument(
+ trackerT.isSubtypeOf(expectedTrackerT),
+ "Returns %s, but must return a subtype of %s",
+ formatType(trackerT),
+ formatType(expectedTrackerT));
+ return DoFnSignature.NewTrackerMethod.create(m, restrictionT, trackerT);
+ }
+
private static Collection<Method> declaredMethodsWithAnnotation(
Class<? extends Annotation> anno, Class<?> startClass, Class<?> stopClass) {
Collection<Method> matches = new ArrayList<>();
@@ -310,7 +652,7 @@ public class DoFnSignatures {
}
private static String format(Method method) {
- return ReflectHelpers.CLASS_AND_METHOD_FORMATTER.apply(method);
+ return ReflectHelpers.METHOD_FORMATTER.apply(method);
}
private static String formatType(TypeToken<?> t) {
@@ -327,7 +669,9 @@ public class DoFnSignatures {
ErrorReporter forMethod(Class<? extends Annotation> annotation, Method method) {
return new ErrorReporter(
this,
- String.format("@%s %s", annotation, (method == null) ? "(absent)" : format(method)));
+ String.format(
+ "@%s %s",
+ annotation.getSimpleName(), (method == null) ? "(absent)" : format(method)));
}
void throwIllegalArgument(String message, Object... args) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
new file mode 100644
index 0000000..6b249ee
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * Manages concurrent access to the restriction and keeps track of its claimed part for a <a
+ * href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn}.
+ */
+public interface RestrictionTracker<RestrictionT> {
+ /**
+ * Returns a restriction accurately describing the full range of work the current {@link
+ * DoFn.ProcessElement} call will do, including already completed work.
+ */
+ RestrictionT currentRestriction();
+
+ /**
+ * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible.
+ * Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the work:
+ * the old value of {@link #currentRestriction} is equivalent to the new value and the return
+ * value of this method combined.
+ */
+ RestrictionT checkpoint();
+
+ // TODO: Add the more general splitRemainderAfterFraction() and other methods.
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
new file mode 100644
index 0000000..1ceb880
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines utilities related to <a href="https://s.apache.org/splittable-do-fn>splittable</a>
+ * {@link org.apache.beam.sdk.transforms.DoFn}'s.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
index f0f7d22..436e227 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/KvCoderTest.java
@@ -17,11 +17,9 @@
*/
package org.apache.beam.sdk.coders;
-import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
@@ -31,40 +29,55 @@ import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
-/**
- * Test case for {@link KvCoder}.
- */
+/** Test case for {@link KvCoder}. */
@RunWith(JUnit4.class)
public class KvCoderTest {
+ private static class CoderAndData<T> {
+ Coder<T> coder;
+ List<T> data;
+ }
+
+ private static class AnyCoderAndData {
+ private CoderAndData<?> coderAndData;
+ }
- private static final Map<Coder<?>, Iterable<?>> TEST_DATA =
- new ImmutableMap.Builder<Coder<?>, Iterable<?>>()
- .put(VarIntCoder.of(),
- Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE))
- .put(BigEndianLongCoder.of(),
- Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE))
- .put(StringUtf8Coder.of(),
- Arrays.asList("", "hello", "goodbye", "1"))
- .put(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
- Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE)))
- .put(ListCoder.of(VarLongCoder.of()),
- Arrays.asList(
- Arrays.asList(1L, 2L, 3L),
- Collections.emptyList()))
- .build();
+ private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> data) {
+ CoderAndData<T> coderAndData = new CoderAndData<>();
+ coderAndData.coder = coder;
+ coderAndData.data = data;
+ AnyCoderAndData res = new AnyCoderAndData();
+ res.coderAndData = coderAndData;
+ return res;
+ }
+
+ private static final List<AnyCoderAndData> TEST_DATA =
+ Arrays.asList(
+ coderAndData(
+ VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)),
+ coderAndData(
+ BigEndianLongCoder.of(),
+ Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
+ coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")),
+ coderAndData(
+ KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+ Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE))),
+ coderAndData(
+ ListCoder.of(VarLongCoder.of()),
+ Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.<Long>emptyList())));
@Test
+ @SuppressWarnings("rawtypes")
public void testDecodeEncodeEqual() throws Exception {
- for (Map.Entry<Coder<?>, Iterable<?>> entry : TEST_DATA.entrySet()) {
- // The coder and corresponding values must be the same type.
- // If someone messes this up in the above test data, the test
- // will fail anyhow (unless the coder magically works on data
- // it does not understand).
- @SuppressWarnings("unchecked")
- Coder<Object> coder = (Coder<Object>) entry.getKey();
- Iterable<?> values = entry.getValue();
- for (Object value : values) {
- CoderProperties.coderDecodeEncodeEqual(coder, value);
+ for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
+ Coder keyCoder = keyCoderAndData.coderAndData.coder;
+ for (Object key : keyCoderAndData.coderAndData.data) {
+ for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
+ Coder valueCoder = valueCoderAndData.coderAndData.coder;
+ for (Object value : valueCoderAndData.coderAndData.data) {
+ CoderProperties.coderDecodeEncodeEqual(
+ KvCoder.of(keyCoder, valueCoder), KV.of(key, value));
+ }
+ }
}
}
}
@@ -75,37 +88,29 @@ public class KvCoderTest {
@Test
public void testEncodingId() throws Exception {
CoderProperties.coderHasEncodingId(
- KvCoder.of(VarIntCoder.of(), VarIntCoder.of()),
- EXPECTED_ENCODING_ID);
+ KvCoder.of(VarIntCoder.of(), VarIntCoder.of()), EXPECTED_ENCODING_ID);
}
- /**
- * Homogeneously typed test value for ease of use with the wire format test utility.
- */
+ /** Homogeneously typed test value for ease of use with the wire format test utility. */
private static final Coder<KV<String, Integer>> TEST_CODER =
KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of());
- private static final List<KV<String, Integer>> TEST_VALUES = Arrays.asList(
- KV.of("", -1),
- KV.of("hello", 0),
- KV.of("goodbye", Integer.MAX_VALUE));
+ private static final List<KV<String, Integer>> TEST_VALUES =
+ Arrays.asList(KV.of("", -1), KV.of("hello", 0), KV.of("goodbye", Integer.MAX_VALUE));
/**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
+ * Generated data to check that the wire format has not changed. To regenerate, see {@link
+ * org.apache.beam.sdk.coders.PrintBase64Encodings}.
*/
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "AP____8P",
- "BWhlbGxvAA",
- "B2dvb2RieWX_____Bw");
+ private static final List<String> TEST_ENCODINGS =
+ Arrays.asList("AP____8P", "BWhlbGxvAA", "B2dvb2RieWX_____Bw");
@Test
public void testWireFormatEncode() throws Exception {
CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
}
- @Rule
- public ExpectedException thrown = ExpectedException.none();
+ @Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void encodeNullThrowsCoderException() throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 7ce98bc..9c7b991 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -56,6 +56,7 @@ import org.apache.beam.sdk.transforms.ParDo.Bound;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.DisplayDataMatchers;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
@@ -1469,4 +1470,49 @@ public class ParDoTest implements Serializable {
assertThat(displayData, includesDisplayDataFrom(fn));
assertThat(displayData, hasDisplayItem("fn", fn.getClass()));
}
+
+ private abstract static class SomeTracker implements RestrictionTracker<Object> {}
+ private static class TestSplittableDoFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void processElement(ProcessContext context, SomeTracker tracker) {}
+
+ @GetInitialRestriction
+ public Object getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeTracker newTracker(Object restriction) {
+ return null;
+ }
+ }
+
+ @Test
+ public void testRejectsSplittableDoFnByDefault() {
+ // ParDo with a splittable DoFn must be overridden by the runner.
+ // Without an override, applying it directly must fail.
+ Pipeline p = TestPipeline.create();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Splittable DoFn not supported by the current runner");
+
+ p.apply(Create.of(1, 2, 3)).apply(ParDo.of(new TestSplittableDoFn()));
+ }
+
+ @Test
+ public void testMultiRejectsSplittableDoFnByDefault() {
+ // ParDo with a splittable DoFn must be overridden by the runner.
+ // Without an override, applying it directly must fail.
+ Pipeline p = TestPipeline.create();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Splittable DoFn not supported by the current runner");
+
+ p.apply(Create.of(1, 2, 3))
+ .apply(
+ ParDo.of(new TestSplittableDoFn())
+ .withOutputTags(
+ new TupleTag<String>("main") {},
+ TupleTagList.of(new TupleTag<String>("side1") {})));
+ }
}
[4/4] incubator-beam git commit: Closes #896
Posted by bc...@apache.org.
Closes #896
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13b45895
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13b45895
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13b45895
Branch: refs/heads/master
Commit: 13b45895eb13cd48557472463404bd097d7097d7
Parents: a5d1293 a0a2488
Author: bchambers <bc...@google.com>
Authored: Wed Oct 12 17:29:21 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Oct 12 17:29:21 2016 -0700
----------------------------------------------------------------------
runners/core-java/pom.xml | 6 +
.../runners/core/ElementAndRestriction.java | 42 ++
.../core/ElementAndRestrictionCoder.java | 67 +++
.../runners/core/GBKIntoKeyedWorkItems.java | 40 ++
.../beam/runners/core/SplittableParDo.java | 469 ++++++++++++++++
.../core/ElementAndRestrictionCoderTest.java | 127 +++++
.../beam/runners/core/SplittableParDoTest.java | 467 ++++++++++++++++
...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 66 +++
.../beam/runners/direct/DirectRunner.java | 5 +
.../runners/direct/ParDoOverrideFactory.java | 55 ++
.../beam/runners/direct/SplittableDoFnTest.java | 225 ++++++++
.../beam/sdk/annotations/Experimental.java | 8 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 218 +++++++-
.../beam/sdk/transforms/DoFnAdapters.java | 27 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 117 ++--
.../org/apache/beam/sdk/transforms/ParDo.java | 19 +
.../sdk/transforms/reflect/DoFnInvoker.java | 26 +-
.../sdk/transforms/reflect/DoFnInvokers.java | 179 +++++-
.../sdk/transforms/reflect/DoFnSignature.java | 114 +++-
.../sdk/transforms/reflect/DoFnSignatures.java | 366 ++++++++++++-
.../splittabledofn/RestrictionTracker.java | 42 ++
.../transforms/splittabledofn/package-info.java | 22 +
.../org/apache/beam/sdk/coders/KvCoderTest.java | 99 ++--
.../apache/beam/sdk/transforms/ParDoTest.java | 46 ++
.../transforms/reflect/DoFnInvokersTest.java | 261 ++++++++-
.../DoFnSignaturesProcessElementTest.java | 4 +-
.../DoFnSignaturesSplittableDoFnTest.java | 543 +++++++++++++++++++
27 files changed, 3495 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-beam git commit: [BEAM-65] SplittableDoFn prototype.
Posted by bc...@apache.org.
[BEAM-65] SplittableDoFn prototype.
Work in progress. Currently only runs in direct runner,
and not ready for any use by real users.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a0a24883
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a0a24883
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a0a24883
Branch: refs/heads/master
Commit: a0a24883737850052f54290255780e868c0b63dc
Parents: a5d1293
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 11 17:13:53 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Oct 12 17:29:20 2016 -0700
----------------------------------------------------------------------
runners/core-java/pom.xml | 6 +
.../runners/core/ElementAndRestriction.java | 42 ++
.../core/ElementAndRestrictionCoder.java | 67 +++
.../runners/core/GBKIntoKeyedWorkItems.java | 40 ++
.../beam/runners/core/SplittableParDo.java | 469 ++++++++++++++++
.../core/ElementAndRestrictionCoderTest.java | 127 +++++
.../beam/runners/core/SplittableParDoTest.java | 467 ++++++++++++++++
...ectGBKIntoKeyedWorkItemsOverrideFactory.java | 66 +++
.../beam/runners/direct/DirectRunner.java | 5 +
.../runners/direct/ParDoOverrideFactory.java | 55 ++
.../beam/runners/direct/SplittableDoFnTest.java | 225 ++++++++
.../beam/sdk/annotations/Experimental.java | 8 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 218 +++++++-
.../beam/sdk/transforms/DoFnAdapters.java | 27 +-
.../apache/beam/sdk/transforms/DoFnTester.java | 117 ++--
.../org/apache/beam/sdk/transforms/ParDo.java | 19 +
.../sdk/transforms/reflect/DoFnInvoker.java | 26 +-
.../sdk/transforms/reflect/DoFnInvokers.java | 179 +++++-
.../sdk/transforms/reflect/DoFnSignature.java | 114 +++-
.../sdk/transforms/reflect/DoFnSignatures.java | 366 ++++++++++++-
.../splittabledofn/RestrictionTracker.java | 42 ++
.../transforms/splittabledofn/package-info.java | 22 +
.../org/apache/beam/sdk/coders/KvCoderTest.java | 99 ++--
.../apache/beam/sdk/transforms/ParDoTest.java | 46 ++
.../transforms/reflect/DoFnInvokersTest.java | 261 ++++++++-
.../DoFnSignaturesProcessElementTest.java | 4 +-
.../DoFnSignaturesSplittableDoFnTest.java | 543 +++++++++++++++++++
27 files changed, 3495 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index d958dd2..d84c420 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -190,6 +190,12 @@
</dependency>
<dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
new file mode 100644
index 0000000..4a5d0c4
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestriction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.auto.value.AutoValue;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.DoFn;
+
+/**
+ * A tuple of an element and a restriction applied to processing it with a
+ * <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+@AutoValue
+public abstract class ElementAndRestriction<ElementT, RestrictionT> {
+ /** The element to process. */
+ public abstract ElementT element();
+
+ /** The restriction applied to processing the element. */
+ public abstract RestrictionT restriction();
+
+ /** Constructs the {@link ElementAndRestriction}. */
+ public static <InputT, RestrictionT> ElementAndRestriction<InputT, RestrictionT> of(
+ InputT element, RestrictionT restriction) {
+ return new AutoValue_ElementAndRestriction<>(element, restriction);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
new file mode 100644
index 0000000..6dec8e2
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ElementAndRestrictionCoder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
+
+/** A {@link Coder} for {@link ElementAndRestriction}. */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class ElementAndRestrictionCoder<ElementT, RestrictionT>
+ extends CustomCoder<ElementAndRestriction<ElementT, RestrictionT>> {
+ private final Coder<ElementT> elementCoder;
+ private final Coder<RestrictionT> restrictionCoder;
+
+ /**
+ * Creates an {@link ElementAndRestrictionCoder} from an element coder and a restriction coder.
+ */
+ public static <ElementT, RestrictionT> ElementAndRestrictionCoder<ElementT, RestrictionT> of(
+ Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+ return new ElementAndRestrictionCoder<>(elementCoder, restrictionCoder);
+ }
+
+ private ElementAndRestrictionCoder(
+ Coder<ElementT> elementCoder, Coder<RestrictionT> restrictionCoder) {
+ this.elementCoder = elementCoder;
+ this.restrictionCoder = restrictionCoder;
+ }
+
+ @Override
+ public void encode(
+ ElementAndRestriction<ElementT, RestrictionT> value, OutputStream outStream, Context context)
+ throws IOException {
+ if (value == null) {
+ throw new CoderException("cannot encode a null ElementAndRestriction");
+ }
+ elementCoder.encode(value.element(), outStream, context.nested());
+ restrictionCoder.encode(value.restriction(), outStream, context);
+ }
+
+ @Override
+ public ElementAndRestriction<ElementT, RestrictionT> decode(InputStream inStream, Context context)
+ throws IOException {
+ ElementT key = elementCoder.decode(inStream, context.nested());
+ RestrictionT value = restrictionCoder.decode(inStream, context);
+ return ElementAndRestriction.of(key, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
new file mode 100644
index 0000000..ca4d681
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GBKIntoKeyedWorkItems.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Interface for creating a runner-specific {@link GroupByKey GroupByKey-like} {@link PTransform}
+ * that produces {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state
+ * and timers.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class GBKIntoKeyedWorkItems<KeyT, InputT>
+ extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+ @Override
+ public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+ return PCollection.createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
new file mode 100644
index 0000000..7645149
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Iterables;
+import java.util.UUID;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
+
+/**
+ * A utility transform that executes a <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} by expanding it into a
+ * network of simpler transforms:
+ *
+ * <ol>
+ * <li>Pair each element with an initial restriction
+ * <li>Split each restriction into sub-restrictions
+ * <li>Assign a unique key to each element/restriction pair
+ * <li>Group by key (so that work is partitioned by key and we can access state/timers)
+ * <li>Process each keyed element/restriction pair with the splittable {@link DoFn}'s {@link
+ * DoFn.ProcessElement} method, using state and timers API.
+ * </ol>
+ *
+ * <p>This transform is intended as a helper for internal use by runners when implementing {@code
+ * ParDo.of(splittable DoFn)}, but not for direct use by pipeline writers.
+ */
+@Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+public class SplittableParDo<
+ InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+ private final DoFn<InputT, OutputT> fn;
+ private final DoFnSignature signature;
+
+ /**
+ * Creates the transform for the given original {@link ParDo} and {@link DoFn}.
+ *
+ * @param fn The splittable {@link DoFn} inside the original {@link ParDo} transform.
+ */
+ public SplittableParDo(DoFn<InputT, OutputT> fn) {
+ checkNotNull(fn, "fn must not be null");
+ this.fn = fn;
+ this.signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+ checkArgument(signature.processElement().isSplittable(), "fn must be a splittable DoFn");
+ }
+
+ @Override
+ public PCollection<OutputT> apply(PCollection<InputT> input) {
+ PCollection.IsBounded isFnBounded = signature.isBoundedPerElement();
+ Coder<RestrictionT> restrictionCoder =
+ DoFnInvokers.INSTANCE
+ .newByteBuddyInvoker(fn)
+ .invokeGetRestrictionCoder(input.getPipeline().getCoderRegistry());
+ Coder<ElementAndRestriction<InputT, RestrictionT>> splitCoder =
+ ElementAndRestrictionCoder.of(input.getCoder(), restrictionCoder);
+
+ PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> keyedWorkItems =
+ input
+ .apply(
+ "Pair with initial restriction",
+ ParDo.of(new PairWithRestrictionFn<InputT, OutputT, RestrictionT>(fn)))
+ .setCoder(splitCoder)
+ .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn)))
+ .setCoder(splitCoder)
+ .apply(
+ "Assign unique key",
+ WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>()))
+ .apply(
+ "Group by key",
+ new GBKIntoKeyedWorkItems<String, ElementAndRestriction<InputT, RestrictionT>>());
+ checkArgument(
+ keyedWorkItems.getWindowingStrategy().getWindowFn() instanceof GlobalWindows,
+ "GBKIntoKeyedWorkItems must produce a globally windowed collection, "
+ + "but windowing strategy was: %s",
+ keyedWorkItems.getWindowingStrategy());
+ return keyedWorkItems
+ .apply(
+ "Process",
+ ParDo.of(
+ new ProcessFn<InputT, OutputT, RestrictionT, TrackerT>(
+ fn,
+ input.getCoder(),
+ restrictionCoder,
+ input.getWindowingStrategy().getWindowFn().windowCoder())))
+ .setIsBoundedInternal(input.isBounded().and(isFnBounded))
+ .setWindowingStrategyInternal(input.getWindowingStrategy());
+ }
+
+ /**
+ * Assigns a random unique key to each element of the input collection, so that the output
+ * collection is effectively the same elements as input, but the per-key state and timers are now
+ * effectively per-element.
+ */
+ private static class RandomUniqueKeyFn<T> implements SerializableFunction<T, String> {
+ @Override
+ public String apply(T input) {
+ return UUID.randomUUID().toString();
+ }
+ }
+
+ /**
+ * Pairs each input element with its initial restriction using the given splittable {@link DoFn}.
+ */
+ private static class PairWithRestrictionFn<InputT, OutputT, RestrictionT>
+ extends DoFn<InputT, ElementAndRestriction<InputT, RestrictionT>> {
+ private DoFn<InputT, OutputT> fn;
+ private transient DoFnInvoker<InputT, OutputT> invoker;
+
+ PairWithRestrictionFn(DoFn<InputT, OutputT> fn) {
+ this.fn = fn;
+ }
+
+ @Setup
+ public void setup() {
+ invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ context.output(
+ ElementAndRestriction.of(
+ context.element(),
+ invoker.<RestrictionT>invokeGetInitialRestriction(context.element())));
+ }
+ }
+
+ /**
+ * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair
+ * by creating a tracker for the restriction and checkpointing/resuming processing later if
+ * necessary.
+ *
+ * <p>TODO: This uses deprecated OldDoFn since DoFn does not provide access to state/timer
+ * internals. This should be rewritten to use the <a href="https://s.apache.org/beam-state">State
+ * and Timers API</a> once it is available.
+ */
+ @VisibleForTesting
+ static class ProcessFn<
+ InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>>
+ extends OldDoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> {
+ // Commit at least once every 10k output records. This keeps the watermark advancing
+ // smoothly, and ensures that not too much work will have to be reprocessed in the event of
+ // a crash.
+ // TODO: Also commit at least once every N seconds (runner-specific parameter).
+ @VisibleForTesting static final int MAX_OUTPUTS_PER_BUNDLE = 10000;
+
+ /**
+ * The state cell containing a watermark hold for the output of this {@link DoFn}. The hold is
+ * acquired during the first {@link DoFn.ProcessElement} call for each element and restriction,
+ * and is released when the {@link DoFn.ProcessElement} call returns {@link
+ * DoFn.ProcessContinuation#stop}.
+ *
+ * <p>A hold is needed to avoid letting the output watermark immediately progress together with
+ * the input watermark when the first {@link DoFn.ProcessElement} call for this element
+ * completes.
+ *
+ * <p>The hold is updated with the future output watermark reported by ProcessContinuation.
+ */
+ private static final StateTag<Object, WatermarkHoldState<GlobalWindow>> watermarkHoldTag =
+ StateTags.makeSystemTagInternal(
+ StateTags.<GlobalWindow>watermarkStateInternal(
+ "hold", OutputTimeFns.outputAtLatestInputTimestamp()));
+
+ /**
+ * The state cell containing a copy of the element. Written during the first {@link
+ * DoFn.ProcessElement} call and read during subsequent calls in response to timer firings, when
+ * the original element is no longer available.
+ */
+ private final StateTag<Object, ValueState<WindowedValue<InputT>>> elementTag;
+
+ /**
+ * The state cell containing a restriction representing the unprocessed part of work for this
+ * element.
+ */
+ private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
+
+ private final DoFn<InputT, OutputT> fn;
+ private final Coder<? extends BoundedWindow> windowCoder;
+
+ private transient DoFnInvoker<InputT, OutputT> invoker;
+
+ ProcessFn(
+ DoFn<InputT, OutputT> fn,
+ Coder<InputT> elementCoder,
+ Coder<RestrictionT> restrictionCoder,
+ Coder<? extends BoundedWindow> windowCoder) {
+ this.fn = fn;
+ this.windowCoder = windowCoder;
+ elementTag =
+ StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
+ DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ restrictionTag = StateTags.value("restriction", restrictionCoder);
+ }
+
+ @Override
+ public void setup() throws Exception {
+ invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(fn);
+ }
+
+ @Override
+ public void processElement(final ProcessContext c) {
+ // Initialize state (element and restriction) depending on whether this is the seed call.
+ // The seed call is the first call for this element, which actually has the element.
+ // Subsequent calls are timer firings and the element has to be retrieved from the state.
+ TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null);
+ boolean isSeedCall = (timer == null);
+ StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace();
+ ValueState<WindowedValue<InputT>> elementState =
+ c.windowingInternals().stateInternals().state(stateNamespace, elementTag);
+ ValueState<RestrictionT> restrictionState =
+ c.windowingInternals().stateInternals().state(stateNamespace, restrictionTag);
+ WatermarkHoldState<GlobalWindow> holdState =
+ c.windowingInternals().stateInternals().state(stateNamespace, watermarkHoldTag);
+
+ ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction;
+ if (isSeedCall) {
+ // The element and restriction are available in c.element().
+ WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue =
+ Iterables.getOnlyElement(c.element().elementsIterable());
+ WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element());
+ elementState.write(element);
+ elementAndRestriction =
+ ElementAndRestriction.of(element, windowedValue.getValue().restriction());
+ } else {
+ // This is not the first ProcessElement call for this element/restriction - rather,
+ // this is a timer firing, so we need to fetch the element and restriction from state.
+ elementState.readLater();
+ restrictionState.readLater();
+ elementAndRestriction =
+ ElementAndRestriction.of(elementState.read(), restrictionState.read());
+ }
+
+ final TrackerT tracker = invoker.invokeNewTracker(elementAndRestriction.restriction());
+ @SuppressWarnings("unchecked")
+ final RestrictionT[] residual = (RestrictionT[]) new Object[1];
+ // TODO: Only let the call run for a limited amount of time, rather than simply
+ // producing a limited amount of output.
+ DoFn.ProcessContinuation cont =
+ invoker.invokeProcessElement(
+ makeContext(c, elementAndRestriction.element(), tracker, residual),
+ wrapTracker(tracker));
+ if (residual[0] == null) {
+ // This means the call completed unsolicited, and the context produced by makeContext()
+ // did not take a checkpoint. Take one now.
+ residual[0] = checkNotNull(tracker.checkpoint());
+ }
+
+ // Save state for resuming.
+ if (!cont.shouldResume()) {
+ // All work for this element/restriction is completed. Clear state and release hold.
+ elementState.clear();
+ restrictionState.clear();
+ holdState.clear();
+ return;
+ }
+ restrictionState.write(residual[0]);
+ Instant futureOutputWatermark = cont.getWatermark();
+ if (futureOutputWatermark != null) {
+ holdState.add(futureOutputWatermark);
+ }
+ // Set a timer to continue processing this element.
+ TimerInternals timerInternals = c.windowingInternals().timerInternals();
+ timerInternals.setTimer(
+ TimerInternals.TimerData.of(
+ stateNamespace,
+ timerInternals.currentProcessingTime().plus(cont.resumeDelay()),
+ TimeDomain.PROCESSING_TIME));
+ }
+
+ private DoFn<InputT, OutputT>.ProcessContext makeContext(
+ final ProcessContext baseContext,
+ final WindowedValue<InputT> element,
+ final TrackerT tracker,
+ final RestrictionT[] residualRestrictionHolder) {
+ return fn.new ProcessContext() {
+ private int numOutputs = 0;
+
+ public InputT element() {
+ return element.getValue();
+ }
+
+ public Instant timestamp() {
+ return element.getTimestamp();
+ }
+
+ public PaneInfo pane() {
+ return element.getPane();
+ }
+
+ public void output(OutputT output) {
+ baseContext
+ .windowingInternals()
+ .outputWindowedValue(
+ output, element.getTimestamp(), element.getWindows(), element.getPane());
+ noteOutput();
+ }
+
+ public void outputWithTimestamp(OutputT output, Instant timestamp) {
+ baseContext
+ .windowingInternals()
+ .outputWindowedValue(output, timestamp, element.getWindows(), element.getPane());
+ noteOutput();
+ }
+
+ private void noteOutput() {
+ if (++numOutputs >= MAX_OUTPUTS_PER_BUNDLE) {
+ // Request a checkpoint. The fn *may* produce more output, but hopefully not too much.
+ residualRestrictionHolder[0] = tracker.checkpoint();
+ }
+ }
+
+ public <T> T sideInput(PCollectionView<T> view) {
+ return baseContext.sideInput(view);
+ }
+
+ public PipelineOptions getPipelineOptions() {
+ return baseContext.getPipelineOptions();
+ }
+
+ public <T> void sideOutput(TupleTag<T> tag, T output) {
+ // TODO: I'm not sure how to implement this correctly: there's no
+ // "internals.sideOutputWindowedValue".
+ throw new UnsupportedOperationException(
+ "Side outputs not yet supported by splittable DoFn");
+ }
+
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
+ // TODO: I'm not sure how to implement this correctly: there's no
+ // "internals.sideOutputWindowedValue".
+ throw new UnsupportedOperationException(
+ "Side outputs not yet supported by splittable DoFn");
+ }
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregator(
+ String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ return fn.createAggregator(name, combiner);
+ }
+ };
+ }
+
+ /** Creates an {@link DoFn.ExtraContextFactory} that provides just the given tracker. */
+ private DoFn.ExtraContextFactory<InputT, OutputT> wrapTracker(final TrackerT tracker) {
+ return new ExtraContextFactoryForTracker<>(tracker);
+ }
+
+ private static class ExtraContextFactoryForTracker<
+ InputT, OutputT, TrackerT extends RestrictionTracker<?>>
+ implements DoFn.ExtraContextFactory<InputT, OutputT> {
+ private final TrackerT tracker;
+
+ ExtraContextFactoryForTracker(TrackerT tracker) {
+ this.tracker = tracker;
+ }
+
+ @Override
+ public BoundedWindow window() {
+ // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+ throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+ }
+
+ @Override
+ public DoFn.InputProvider<InputT> inputProvider() {
+ // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+ throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+ }
+
+ @Override
+ public DoFn.OutputReceiver<OutputT> outputReceiver() {
+ // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+ throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+ // DoFnSignatures should have verified that this DoFn doesn't access extra context.
+ throw new IllegalStateException("Unexpected extra context access on a splittable DoFn");
+ }
+
+ @Override
+ public TrackerT restrictionTracker() {
+ return tracker;
+ }
+ }
+ }
+
+ /** Splits the restriction using the given {@link DoFn.SplitRestriction} method. */
+ private static class SplitRestrictionFn<InputT, RestrictionT>
+ extends DoFn<
+ ElementAndRestriction<InputT, RestrictionT>,
+ ElementAndRestriction<InputT, RestrictionT>> {
+ private final DoFn<InputT, ?> splittableFn;
+ private transient DoFnInvoker<InputT, ?> invoker;
+
+ SplitRestrictionFn(DoFn<InputT, ?> splittableFn) {
+ this.splittableFn = splittableFn;
+ }
+
+ @Setup
+ public void setup() {
+ invoker = DoFnInvokers.INSTANCE.newByteBuddyInvoker(splittableFn);
+ }
+
+ @ProcessElement
+ public void processElement(final ProcessContext c) {
+ final InputT element = c.element().element();
+ invoker.invokeSplitRestriction(
+ element,
+ c.element().restriction(),
+ new OutputReceiver<RestrictionT>() {
+ @Override
+ public void output(RestrictionT part) {
+ c.output(ElementAndRestriction.of(element, part));
+ }
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
new file mode 100644
index 0000000..f516046
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ElementAndRestrictionCoderTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.ListCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.testing.CoderProperties;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+
+/**
+ * Tests for {@link ElementAndRestrictionCoder}. Parroted from {@link
+ * org.apache.beam.sdk.coders.KvCoderTest}.
+ */
+@RunWith(Parameterized.class)
+public class ElementAndRestrictionCoderTest<K, V> {
+ private static class CoderAndData<T> {
+ Coder<T> coder;
+ List<T> data;
+ }
+
+ private static class AnyCoderAndData {
+ private CoderAndData<?> coderAndData;
+ }
+
+ private static <T> AnyCoderAndData coderAndData(Coder<T> coder, List<T> data) {
+ CoderAndData<T> coderAndData = new CoderAndData<>();
+ coderAndData.coder = coder;
+ coderAndData.data = data;
+ AnyCoderAndData res = new AnyCoderAndData();
+ res.coderAndData = coderAndData;
+ return res;
+ }
+
+ private static final List<AnyCoderAndData> TEST_DATA =
+ Arrays.asList(
+ coderAndData(
+ VarIntCoder.of(), Arrays.asList(-1, 0, 1, 13, Integer.MAX_VALUE, Integer.MIN_VALUE)),
+ coderAndData(
+ BigEndianLongCoder.of(),
+ Arrays.asList(-1L, 0L, 1L, 13L, Long.MAX_VALUE, Long.MIN_VALUE)),
+ coderAndData(StringUtf8Coder.of(), Arrays.asList("", "hello", "goodbye", "1")),
+ coderAndData(
+ ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()),
+ Arrays.asList(
+ ElementAndRestriction.of("", -1),
+ ElementAndRestriction.of("hello", 0),
+ ElementAndRestriction.of("goodbye", Integer.MAX_VALUE))),
+ coderAndData(
+ ListCoder.of(VarLongCoder.of()),
+ Arrays.asList(Arrays.asList(1L, 2L, 3L), Collections.<Long>emptyList())));
+
+ @Parameterized.Parameters(name = "{index}: keyCoder={0} key={1} valueCoder={2} value={3}")
+ public static Collection<Object[]> data() {
+ List<Object[]> parameters = new ArrayList<>();
+ for (AnyCoderAndData keyCoderAndData : TEST_DATA) {
+ Coder keyCoder = keyCoderAndData.coderAndData.coder;
+ for (Object key : keyCoderAndData.coderAndData.data) {
+ for (AnyCoderAndData valueCoderAndData : TEST_DATA) {
+ Coder valueCoder = valueCoderAndData.coderAndData.coder;
+ for (Object value : valueCoderAndData.coderAndData.data) {
+ parameters.add(new Object[] {keyCoder, key, valueCoder, value});
+ }
+ }
+ }
+ }
+ return parameters;
+ }
+
+ @Parameter(0)
+ public Coder<K> keyCoder;
+ @Parameter(1)
+ public K key;
+ @Parameter(2)
+ public Coder<V> valueCoder;
+ @Parameter(3)
+ public V value;
+
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testDecodeEncodeEqual() throws Exception {
+ CoderProperties.coderDecodeEncodeEqual(
+ ElementAndRestrictionCoder.of(keyCoder, valueCoder),
+ ElementAndRestriction.of(key, value));
+ }
+
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void encodeNullThrowsCoderException() throws Exception {
+ thrown.expect(CoderException.class);
+ thrown.expectMessage("cannot encode a null ElementAndRestriction");
+
+ CoderUtils.encodeToBase64(
+ ElementAndRestrictionCoder.of(StringUtf8Coder.of(), VarIntCoder.of()), null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
new file mode 100644
index 0000000..a76c4da
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
+import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SplittableParDo}. */
+@RunWith(JUnit4.class)
+public class SplittableParDoTest {
+ // ----------------- Tests for whether the transform sets boundedness correctly --------------
+ private static class SomeRestriction implements Serializable {}
+
+ private static class SomeRestrictionTracker implements RestrictionTracker<SomeRestriction> {
+ private final SomeRestriction someRestriction = new SomeRestriction();
+
+ @Override
+ public SomeRestriction currentRestriction() {
+ return someRestriction;
+ }
+
+ @Override
+ public SomeRestriction checkpoint() {
+ return someRestriction;
+ }
+ }
+
+ private static class BoundedFakeFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ }
+
+ private static class UnboundedFakeFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public ProcessContinuation processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {
+ return stop();
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer element) {
+ return null;
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
+ }
+
+ private static PCollection<Integer> makeUnboundedCollection(Pipeline pipeline) {
+ return pipeline
+ .apply("unbounded", Create.of(1, 2, 3))
+ .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+ }
+
+ private static PCollection<Integer> makeBoundedCollection(Pipeline pipeline) {
+ return pipeline
+ .apply("bounded", Create.of(1, 2, 3))
+ .setIsBoundedInternal(PCollection.IsBounded.BOUNDED);
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testBoundednessForBoundedFn() {
+ Pipeline pipeline = TestPipeline.create();
+ DoFn<Integer, String> boundedFn = new BoundedFakeFn();
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ makeBoundedCollection(pipeline)
+ .apply("bounded to bounded", new SplittableParDo<>(boundedFn))
+ .isBounded());
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ makeUnboundedCollection(pipeline)
+ .apply("bounded to unbounded", new SplittableParDo<>(boundedFn))
+ .isBounded());
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testBoundednessForUnboundedFn() {
+ Pipeline pipeline = TestPipeline.create();
+ DoFn<Integer, String> unboundedFn = new UnboundedFakeFn();
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ makeBoundedCollection(pipeline)
+ .apply("unbounded to bounded", new SplittableParDo<>(unboundedFn))
+ .isBounded());
+ assertEquals(
+ PCollection.IsBounded.BOUNDED,
+ makeUnboundedCollection(pipeline)
+ .apply("unbounded to unbounded", new SplittableParDo<>(unboundedFn))
+ .isBounded());
+ }
+
+ // ------------------------------- Tests for ProcessFn ---------------------------------
+
+ /**
+ * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple
+ * {@link DoFn.ProcessElement} calls).
+ */
+ private static class ProcessFnTester<
+ InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> {
+ private final DoFnTester<
+ KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>
+ tester;
+ private Instant currentProcessingTime;
+
+ ProcessFnTester(
+ Instant currentProcessingTime,
+ DoFn<InputT, OutputT> fn,
+ Coder<InputT> inputCoder,
+ Coder<RestrictionT> restrictionCoder)
+ throws Exception {
+ SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
+ new SplittableParDo.ProcessFn<>(
+ fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
+ this.tester = DoFnTester.of(processFn);
+ this.tester.startBundle();
+ this.tester.advanceProcessingTime(currentProcessingTime);
+
+ this.currentProcessingTime = currentProcessingTime;
+ }
+
+ /** Performs a seed {@link DoFn.ProcessElement} call feeding the element and restriction. */
+ void startElement(InputT element, RestrictionT restriction) throws Exception {
+ startElement(
+ WindowedValue.of(
+ ElementAndRestriction.of(element, restriction),
+ currentProcessingTime,
+ GlobalWindow.INSTANCE,
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+ }
+
+ void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue)
+ throws Exception {
+ tester.processElement(KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue)));
+ }
+
+ /**
+ * Advances processing time by a given duration and, if any timers fired, performs a non-seed
+ * {@link DoFn.ProcessElement} call, feeding it the timers.
+ */
+ boolean advanceProcessingTimeBy(Duration duration) throws Exception {
+ currentProcessingTime = currentProcessingTime.plus(duration);
+ List<TimerInternals.TimerData> timers = tester.advanceProcessingTime(currentProcessingTime);
+ if (timers.isEmpty()) {
+ return false;
+ }
+ tester.processElement(
+ KeyedWorkItems.<String, ElementAndRestriction<InputT, RestrictionT>>timersWorkItem(
+ "key", timers));
+ return true;
+ }
+
+ List<TimestampedValue<OutputT>> peekOutputElementsInWindow(BoundedWindow window) {
+ return tester.peekOutputElementsInWindow(window);
+ }
+
+ List<OutputT> takeOutputElements() {
+ return tester.takeOutputElements();
+ }
+ }
+
+ /** A simple splittable {@link DoFn} that's actually monolithic. */
+ private static class ToStringFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public void process(ProcessContext c, SomeRestrictionTracker tracker) {
+ c.output(c.element().toString() + "a");
+ c.output(c.element().toString() + "b");
+ c.output(c.element().toString() + "c");
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer elem) {
+ return new SomeRestriction();
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return new SomeRestrictionTracker();
+ }
+ }
+
+ @Test
+ public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception {
+ // Tests that ProcessFn correctly propagates windows and timestamp of the element
+ // inside the KeyedWorkItem.
+ // The underlying DoFn is actually monolithic, so this doesn't test splitting.
+ DoFn<Integer, String> fn = new ToStringFn();
+
+ Instant base = Instant.now();
+ ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+ new ProcessFnTester<>(
+ base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
+
+ IntervalWindow w1 =
+ new IntervalWindow(
+ base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1)));
+ IntervalWindow w2 =
+ new IntervalWindow(
+ base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2)));
+ IntervalWindow w3 =
+ new IntervalWindow(
+ base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3)));
+
+ tester.startElement(
+ WindowedValue.of(
+ ElementAndRestriction.of(42, new SomeRestriction()),
+ base,
+ Arrays.asList(w1, w2, w3),
+ PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) {
+ assertEquals(
+ Arrays.asList(
+ TimestampedValue.of("42a", base),
+ TimestampedValue.of("42b", base),
+ TimestampedValue.of("42c", base)),
+ tester.peekOutputElementsInWindow(w));
+ }
+ }
+
+ /** A simple splittable {@link DoFn} that outputs the given element every 5 seconds forever. */
+ private static class SelfInitiatedResumeFn extends DoFn<Integer, String> {
+ @ProcessElement
+ public ProcessContinuation process(ProcessContext c, SomeRestrictionTracker tracker) {
+ c.output(c.element().toString());
+ return resume().withResumeDelay(Duration.standardSeconds(5)).withWatermark(c.timestamp());
+ }
+
+ @GetInitialRestriction
+ public SomeRestriction getInitialRestriction(Integer elem) {
+ return new SomeRestriction();
+ }
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return new SomeRestrictionTracker();
+ }
+ }
+
+ @Test
+ public void testResumeSetsTimer() throws Exception {
+ DoFn<Integer, String> fn = new SelfInitiatedResumeFn();
+ Instant base = Instant.now();
+ ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester =
+ new ProcessFnTester<>(
+ base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class));
+
+ tester.startElement(42, new SomeRestriction());
+ assertThat(tester.takeOutputElements(), contains("42"));
+
+ // Should resume after 5 seconds: advancing by 3 seconds should have no effect.
+ assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertTrue(tester.takeOutputElements().isEmpty());
+
+ // 6 seconds should be enough - should invoke the fn again.
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertThat(tester.takeOutputElements(), contains("42"));
+
+ // Should again resume after 5 seconds: advancing by 3 seconds should again have no effect.
+ assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertTrue(tester.takeOutputElements().isEmpty());
+
+ // 6 seconds should again be enough.
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(3)));
+ assertThat(tester.takeOutputElements(), contains("42"));
+ }
+
+ private static class SomeCheckpoint implements Serializable {
+ private int firstUnprocessedIndex;
+
+ private SomeCheckpoint(int firstUnprocessedIndex) {
+ this.firstUnprocessedIndex = firstUnprocessedIndex;
+ }
+ }
+
+ private static class SomeCheckpointTracker implements RestrictionTracker<SomeCheckpoint> {
+ private SomeCheckpoint current;
+ private boolean isActive = true;
+
+ private SomeCheckpointTracker(SomeCheckpoint current) {
+ this.current = current;
+ }
+
+ @Override
+ public SomeCheckpoint currentRestriction() {
+ return current;
+ }
+
+ public boolean tryUpdateCheckpoint(int firstUnprocessedIndex) {
+ if (!isActive) {
+ return false;
+ }
+ current = new SomeCheckpoint(firstUnprocessedIndex);
+ return true;
+ }
+
+ @Override
+ public SomeCheckpoint checkpoint() {
+ isActive = false;
+ return current;
+ }
+ }
+
+ /**
+ * A splittable {@link DoFn} that generates the sequence [init, init + total) in batches of given
+ * size.
+ */
+ private static class CounterFn extends DoFn<Integer, String> {
+ private final int numTotalOutputs;
+ private final int numOutputsPerCall;
+
+ private CounterFn(int numTotalOutputs, int numOutputsPerCall) {
+ this.numTotalOutputs = numTotalOutputs;
+ this.numOutputsPerCall = numOutputsPerCall;
+ }
+
+ @ProcessElement
+ public ProcessContinuation process(ProcessContext c, SomeCheckpointTracker tracker) {
+ int start = tracker.currentRestriction().firstUnprocessedIndex;
+ for (int i = 0; i < numOutputsPerCall; ++i) {
+ int index = start + i;
+ if (!tracker.tryUpdateCheckpoint(index + 1)) {
+ return resume();
+ }
+ if (index >= numTotalOutputs) {
+ return stop();
+ }
+ c.output(String.valueOf(c.element() + index));
+ }
+ return resume();
+ }
+
+ @GetInitialRestriction
+ public SomeCheckpoint getInitialRestriction(Integer elem) {
+ throw new UnsupportedOperationException("Expected to be supplied explicitly in this test");
+ }
+
+ @NewTracker
+ public SomeCheckpointTracker newTracker(SomeCheckpoint restriction) {
+ return new SomeCheckpointTracker(restriction);
+ }
+ }
+
+ @Test
+ public void testResumeCarriesOverState() throws Exception {
+ DoFn<Integer, String> fn = new CounterFn(3, 1);
+ Instant base = Instant.now();
+ ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+ new ProcessFnTester<>(
+ base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class));
+
+ tester.startElement(42, new SomeCheckpoint(0));
+ assertThat(tester.takeOutputElements(), contains("42"));
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ assertThat(tester.takeOutputElements(), contains("43"));
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ assertThat(tester.takeOutputElements(), contains("44"));
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ // After outputting all 3 items, should not output anything more.
+ assertEquals(0, tester.takeOutputElements().size());
+ // Should also not ask to resume.
+ assertFalse(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ }
+
+ @Test
+ public void testReactsToCheckpoint() throws Exception {
+ int max = SplittableParDo.ProcessFn.MAX_OUTPUTS_PER_BUNDLE;
+ // Create an fn that attempts to 2x output more than checkpointing allows.
+ DoFn<Integer, String> fn = new CounterFn(2 * max + max / 2, 2 * max);
+ Instant base = Instant.now();
+ int baseIndex = 42;
+
+ ProcessFnTester<Integer, String, SomeCheckpoint, SomeCheckpointTracker> tester =
+ new ProcessFnTester<>(
+ base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeCheckpoint.class));
+
+ List<String> elements;
+
+ tester.startElement(baseIndex, new SomeCheckpoint(0));
+ elements = tester.takeOutputElements();
+ assertEquals(max, elements.size());
+ // Should output the range [0, max)
+ assertThat(elements, hasItem(String.valueOf(baseIndex)));
+ assertThat(elements, hasItem(String.valueOf(baseIndex + max - 1)));
+
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ elements = tester.takeOutputElements();
+ assertEquals(max, elements.size());
+ // Should output the range [max, 2*max)
+ assertThat(elements, hasItem(String.valueOf(baseIndex + max)));
+ assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max - 1)));
+
+ assertTrue(tester.advanceProcessingTimeBy(Duration.standardSeconds(1)));
+ elements = tester.takeOutputElements();
+ assertEquals(max / 2, elements.size());
+ // Should output the range [2*max, 2*max + max/2)
+ assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max)));
+ assertThat(elements, hasItem(String.valueOf(baseIndex + 2 * max + max / 2 - 1)));
+ assertThat(elements, not(hasItem((String.valueOf(baseIndex + 2 * max + max / 2)))));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
new file mode 100644
index 0000000..e232552
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
+import org.apache.beam.sdk.util.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/** Provides an implementation of {@link GBKIntoKeyedWorkItems} for the Direct Runner. */
+class DirectGBKIntoKeyedWorkItemsOverrideFactory implements PTransformOverrideFactory {
+ @Override
+ @SuppressWarnings("unchecked")
+ public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform) {
+ return new DirectGBKIntoKeyedWorkItems(transform.getName());
+ }
+
+ /** The Direct Runner specific implementation of {@link GBKIntoKeyedWorkItems}. */
+ private static class DirectGBKIntoKeyedWorkItems<KeyT, InputT>
+ extends PTransform<PCollection<KV<KeyT, InputT>>, PCollection<KeyedWorkItem<KeyT, InputT>>> {
+ DirectGBKIntoKeyedWorkItems(String name) {
+ super(name);
+ }
+
+ @Override
+ public PCollection<KeyedWorkItem<KeyT, InputT>> apply(PCollection<KV<KeyT, InputT>> input) {
+ checkArgument(input.getCoder() instanceof KvCoder);
+ KvCoder<KeyT, InputT> kvCoder = (KvCoder<KeyT, InputT>) input.getCoder();
+ return input
+ .apply(new ReifyTimestampsAndWindows<KeyT, InputT>())
+ // TODO: Perhaps windowing strategy should instead be set by ReifyTAW, or by DGBKO
+ .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
+ .apply(new DirectGroupByKey.DirectGroupByKeyOnly<KeyT, InputT>())
+ .setCoder(
+ KeyedWorkItemCoder.of(
+ kvCoder.getKeyCoder(),
+ kvCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder()));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 224101a..a72f7ae 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
+import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -83,6 +84,10 @@ public class DirectRunner
.put(GroupByKey.class, new DirectGroupByKeyOverrideFactory())
.put(TestStream.class, new DirectTestStreamFactory())
.put(Write.Bound.class, new WriteWithShardingFactory())
+ .put(ParDo.Bound.class, new ParDoOverrideFactory())
+ .put(
+ GBKIntoKeyedWorkItems.class,
+ new DirectGBKIntoKeyedWorkItemsOverrideFactory())
.build();
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
new file mode 100644
index 0000000..a57735c
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoOverrideFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnAdapters;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
+ * in the direct runner. Currently overrides applications of <a
+ * href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
+ */
+class ParDoOverrideFactory implements PTransformOverrideFactory {
+ @Override
+ @SuppressWarnings("unchecked")
+ public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform) {
+ if (!(transform instanceof ParDo.Bound)) {
+ return transform;
+ }
+ ParDo.Bound<InputT, OutputT> that = (ParDo.Bound<InputT, OutputT>) transform;
+ DoFn<InputT, OutputT> fn = DoFnAdapters.getDoFn(that.getFn());
+ if (fn == null) {
+ // This is an OldDoFn, hence not splittable.
+ return transform;
+ }
+ DoFnSignature signature = DoFnSignatures.INSTANCE.getOrParseSignature(fn.getClass());
+ if (!signature.processElement().isSplittable()) {
+ return transform;
+ }
+ return new SplittableParDo(fn);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
new file mode 100644
index 0000000..84a0cd9
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SplittableDoFnTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.MutableDateTime;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for <a href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} behavior
+ * using the direct runner.
+ *
+ * <p>TODO: make this use @RunnableOnService.
+ */
+@RunWith(JUnit4.class)
+public class SplittableDoFnTest {
+ static class OffsetRange implements Serializable {
+ public final int from;
+ public final int to;
+
+ OffsetRange(int from, int to) {
+ this.from = from;
+ this.to = to;
+ }
+ }
+
+ private static class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
+ private OffsetRange range;
+ private Integer lastClaimedIndex = null;
+
+ OffsetRangeTracker(OffsetRange range) {
+ this.range = checkNotNull(range);
+ }
+
+ @Override
+ public OffsetRange currentRestriction() {
+ return range;
+ }
+
+ @Override
+ public OffsetRange checkpoint() {
+ if (lastClaimedIndex == null) {
+ OffsetRange res = range;
+ range = new OffsetRange(range.from, range.from);
+ return res;
+ }
+ OffsetRange res = new OffsetRange(lastClaimedIndex + 1, range.to);
+ this.range = new OffsetRange(range.from, lastClaimedIndex + 1);
+ return res;
+ }
+
+ boolean tryClaim(int i) {
+ checkState(lastClaimedIndex == null || i > lastClaimedIndex);
+ if (i >= range.to) {
+ return false;
+ }
+ lastClaimedIndex = i;
+ return true;
+ }
+ }
+
+ static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
+ @ProcessElement
+ public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
+ for (int i = tracker.currentRestriction().from; tracker.tryClaim(i); ++i) {
+ c.output(KV.of(c.element(), i));
+ if (i % 3 == 0) {
+ return ProcessContinuation.resume();
+ }
+ }
+ return ProcessContinuation.stop();
+ }
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRange(String element) {
+ return new OffsetRange(0, element.length());
+ }
+
+ @SplitRestriction
+ public void splitRange(
+ String element, OffsetRange range, OutputReceiver<OffsetRange> receiver) {
+ receiver.output(new OffsetRange(range.from, (range.from + range.to) / 2));
+ receiver.output(new OffsetRange((range.from + range.to) / 2, range.to));
+ }
+
+ @NewTracker
+ public OffsetRangeTracker newTracker(OffsetRange range) {
+ return new OffsetRangeTracker(range);
+ }
+ }
+
+ private static class ReifyTimestampsFn<T> extends DoFn<T, TimestampedValue<T>> {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ c.output(TimestampedValue.of(c.element(), c.timestamp()));
+ }
+ }
+
+ @Test
+ public void testPairWithIndexBasic() throws ClassNotFoundException {
+ Pipeline p = TestPipeline.create();
+ p.getOptions().setRunner(DirectRunner.class);
+ PCollection<KV<String, Integer>> res =
+ p.apply(Create.of("a", "bb", "ccccc"))
+ .apply(ParDo.of(new PairStringWithIndexToLength()))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+ PAssert.that(res)
+ .containsInAnyOrder(
+ Arrays.asList(
+ KV.of("a", 0),
+ KV.of("bb", 0),
+ KV.of("bb", 1),
+ KV.of("ccccc", 0),
+ KV.of("ccccc", 1),
+ KV.of("ccccc", 2),
+ KV.of("ccccc", 3),
+ KV.of("ccccc", 4)));
+
+ p.run();
+ }
+
+ @Test
+ public void testPairWithIndexWindowedTimestamped() throws ClassNotFoundException {
+ // Tests that Splittable DoFn correctly propagates windowing strategy, windows and timestamps
+ // of elements in the input collection.
+ Pipeline p = TestPipeline.create();
+ p.getOptions().setRunner(DirectRunner.class);
+
+ MutableDateTime mutableNow = Instant.now().toMutableDateTime();
+ mutableNow.setMillisOfSecond(0);
+ Instant now = mutableNow.toInstant();
+ Instant nowP1 = now.plus(Duration.standardSeconds(1));
+ Instant nowP2 = now.plus(Duration.standardSeconds(2));
+
+ SlidingWindows windowFn =
+ SlidingWindows.of(Duration.standardSeconds(5)).every(Duration.standardSeconds(1));
+ PCollection<KV<String, Integer>> res =
+ p.apply(
+ Create.timestamped(
+ TimestampedValue.of("a", now),
+ TimestampedValue.of("bb", nowP1),
+ TimestampedValue.of("ccccc", nowP2)))
+ .apply(Window.<String>into(windowFn))
+ .apply(ParDo.of(new PairStringWithIndexToLength()))
+ .setCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of()));
+
+ assertEquals(windowFn, res.getWindowingStrategy().getWindowFn());
+
+ PCollection<TimestampedValue<KV<String, Integer>>> timestamped =
+ res.apply("Reify timestamps", ParDo.of(new ReifyTimestampsFn<KV<String, Integer>>()));
+
+ for (int i = 0; i < 4; ++i) {
+ Instant base = now.minus(Duration.standardSeconds(i));
+ IntervalWindow window = new IntervalWindow(base, base.plus(Duration.standardSeconds(5)));
+
+ List<TimestampedValue<KV<String, Integer>>> expectedUnfiltered =
+ Arrays.asList(
+ TimestampedValue.of(KV.of("a", 0), now),
+ TimestampedValue.of(KV.of("bb", 0), nowP1),
+ TimestampedValue.of(KV.of("bb", 1), nowP1),
+ TimestampedValue.of(KV.of("ccccc", 0), nowP2),
+ TimestampedValue.of(KV.of("ccccc", 1), nowP2),
+ TimestampedValue.of(KV.of("ccccc", 2), nowP2),
+ TimestampedValue.of(KV.of("ccccc", 3), nowP2),
+ TimestampedValue.of(KV.of("ccccc", 4), nowP2));
+
+ List<TimestampedValue<KV<String, Integer>>> expected = new ArrayList<>();
+ for (TimestampedValue<KV<String, Integer>> tv : expectedUnfiltered) {
+ if (!window.start().isAfter(tv.getTimestamp())
+ && !tv.getTimestamp().isAfter(window.maxTimestamp())) {
+ expected.add(tv);
+ }
+ }
+ assertFalse(expected.isEmpty());
+
+ PAssert.that(timestamped).inWindow(window).containsInAnyOrder(expected);
+ }
+ p.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index f806926..789f4b2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -76,6 +76,12 @@ public @interface Experimental {
TIMERS,
/** Experimental APIs related to customizing the output time for computed values. */
- OUTPUT_TIME
+ OUTPUT_TIME,
+
+ /**
+ * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
+ * Do not use: API is unstable and runner support is incomplete.
+ */
+ SPLITTABLE_DO_FN,
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a0a24883/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index fb7fbd4..62da28c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@@ -29,6 +30,8 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -37,9 +40,11 @@ import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -341,14 +346,20 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
*/
@Deprecated
WindowingInternals<InputT, OutputT> windowingInternals();
+
+ /**
+ * If this is a splittable {@link DoFn}, returns the {@link RestrictionTracker} associated with
+ * the current {@link ProcessElement} call.
+ */
+ <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker();
}
- /** A placeholder for testing handling of output types during {@link DoFn} reflection. */
+ /** Receives values of the given type. */
public interface OutputReceiver<T> {
void output(T output);
}
- /** A placeholder for testing handling of input types during {@link DoFn} reflection. */
+ /** Provides a single value of the given type. */
public interface InputProvider<T> {
T get();
}
@@ -375,6 +386,10 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
public WindowingInternals<InputT, OutputT> windowingInternals() {
return null;
}
+
+ public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() {
+ return null;
+ }
}
/////////////////////////////////////////////////////////////////////////////
@@ -412,14 +427,57 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
public @interface StartBundle {}
/**
- * Annotation for the method to use for processing elements. A subclass of
- * {@link DoFn} must have a method with this annotation satisfying
- * the following constraints in order for it to be executable:
+ * Annotation for the method to use for processing elements. A subclass of {@link DoFn} must have
+ * a method with this annotation.
+ *
+ * <p>The signature of this method must satisfy the following constraints:
+ *
* <ul>
- * <li>It must have at least one argument.
- * <li>Its first argument must be a {@link DoFn.ProcessContext}.
- * <li>Its remaining argument, if any, must be {@link BoundedWindow}.
+ * <li>Its first argument must be a {@link DoFn.ProcessContext}.
+ * <li>If one of its arguments is a subtype of {@link RestrictionTracker}, then it is a <a
+ * href="https://s.apache.org/splittable-do-fn>splittable</a> {@link DoFn} subject to the
+ * separate requirements described below. Items below are assuming this is not a splittable
+ * {@link DoFn}.
+ * <li>If one of its arguments is {@link BoundedWindow}, this argument corresponds to the window
+ * of the current element. If absent, a runner may perform additional optimizations.
+ * <li>It must return {@code void}.
* </ul>
+ *
+ * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2>
+ *
+ * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter
+ * whose type is a subtype of {@link RestrictionTracker}. This is an advanced feature and an
+ * overwhelming majority of users will never need to write a splittable {@link DoFn}. Right now
+ * the implementation of this feature is in progress and it's not ready for any use.
+ *
+ * <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the
+ * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>).
+ *
+ * <p>If a {@link DoFn} is splittable, the following constraints must be respected:
+ *
+ * <ul>
+ * <li>It <i>must</i> define a {@link GetInitialRestriction} method.
+ * <li>It <i>may</i> define a {@link SplitRestriction} method.
+ * <li>It <i>must</i> define a {@link NewTracker} method returning the same type as the type of
+ * the {@link RestrictionTracker} argument of {@link ProcessElement}, which in turn must be a
+ * subtype of {@code RestrictionTracker<R>} where {@code R} is the restriction type returned
+ * by {@link GetInitialRestriction}.
+ * <li>It <i>may</i> define a {@link GetRestrictionCoder} method.
+ * <li>The type of restrictions used by all of these methods must be the same.
+ * <li>Its {@link ProcessElement} method <i>may</i> return a {@link ProcessContinuation} to
+ * indicate whether there is more work to be done for the current element.
+ * <li>Its {@link ProcessElement} method <i>must not</i> use any extra context parameters, such as
+ * {@link BoundedWindow}.
+ * <li>The {@link DoFn} itself <i>may</i> be annotated with {@link BoundedPerElement} or
+ * {@link UnboundedPerElement}, but not both at the same time. If it's not annotated with
+ * either of these, it's assumed to be {@link BoundedPerElement} if its {@link
+ * ProcessElement} method returns {@code void} and {@link UnboundedPerElement} if it
+ * returns a {@link ProcessContinuation}.
+ * </ul>
+ *
+ * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
+ *
+ * <p>More documentation will be added when the feature becomes ready for general usage.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@@ -455,6 +513,150 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
}
/**
+ * Annotation for the method that maps an element to an initial restriction for a <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ *
+ * <p>Signature: {@code RestrictionT getInitialRestriction(InputT element);}
+ *
+ * <p>TODO: Make the InputT parameter optional.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface GetInitialRestriction {}
+
+ /**
+ * Annotation for the method that returns the coder to use for the restriction of a <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ *
+ * <p>If not defined, a coder will be inferred using standard coder inference rules and the
+ * pipeline's {@link Pipeline#getCoderRegistry coder registry}.
+ *
+ * <p>This method will be called only at pipeline construction time.
+ *
+ * <p>Signature: {@code Coder<RestrictionT> getRestrictionCoder();}
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface GetRestrictionCoder {}
+
+ /**
+ * Annotation for the method that splits restriction of a <a
+ * href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn} into multiple parts to
+ * be processed in parallel.
+ *
+ * <p>Signature: {@code List<RestrictionT> splitRestriction( InputT element, RestrictionT
+ * restriction);}
+ *
+ * <p>Optional: if this method is omitted, the restriction will not be split (equivalent to
+ * defining the method and returning {@code Collections.singletonList(restriction)}).
+ *
+ * <p>TODO: Introduce a parameter for controlling granularity of splitting, e.g. numParts. TODO:
+ * Make the InputT parameter optional.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface SplitRestriction {}
+
+ /**
+ * Annotation for the method that creates a new {@link RestrictionTracker} for the restriction of
+ * a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}.
+ *
+ * <p>Signature: {@code MyRestrictionTracker newTracker(RestrictionT restriction);} where {@code
+ * MyRestrictionTracker} must be a subtype of {@code RestrictionTracker<RestrictionT>}.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.METHOD)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface NewTracker {}
+
+ /**
+ * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
+ * specifying that the {@link DoFn} performs a bounded amount of work per input element, so
+ * applying it to a bounded {@link PCollection} will produce also a bounded {@link PCollection}.
+ * It is an error to specify this on a non-splittable {@link DoFn}.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.TYPE)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface BoundedPerElement {}
+
+ /**
+ * Annotation on a <a href="https://s.apache.org/splittable-do-fn">splittable</a> {@link DoFn}
+ * specifying that the {@link DoFn} performs an unbounded amount of work per input element, so
+ * applying it to a bounded {@link PCollection} will produce an unbounded {@link PCollection}. It
+ * is an error to specify this on a non-splittable {@link DoFn}.
+ */
+ @Documented
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target(ElementType.TYPE)
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public @interface UnboundedPerElement {}
+
+ // This can't be put into ProcessContinuation itself due to the following problem:
+ // http://ternarysearch.blogspot.com/2013/07/static-initialization-deadlock.html
+ private static final ProcessContinuation PROCESS_CONTINUATION_STOP =
+ new AutoValue_DoFn_ProcessContinuation(false, Duration.ZERO, null);
+
+ /**
+ * When used as a return value of {@link ProcessElement}, indicates whether there is more work to
+ * be done for the current element.
+ */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ @AutoValue
+ public abstract static class ProcessContinuation {
+ /** Indicates that there is no more work to be done for the current element. */
+ public static ProcessContinuation stop() {
+ return PROCESS_CONTINUATION_STOP;
+ }
+
+ /** Indicates that there is more work to be done for the current element. */
+ public static ProcessContinuation resume() {
+ return new AutoValue_DoFn_ProcessContinuation(true, Duration.ZERO, null);
+ }
+
+ /**
+ * If false, the {@link DoFn} promises that there is no more work remaining for the current
+ * element, so the runner should not resume the {@link ProcessElement} call.
+ */
+ public abstract boolean shouldResume();
+
+ /**
+ * A minimum duration that should elapse between the end of this {@link ProcessElement} call and
+ * the {@link ProcessElement} call continuing processing of the same element. By default, zero.
+ */
+ public abstract Duration resumeDelay();
+
+ /**
+ * A lower bound provided by the {@link DoFn} on timestamps of the output that will be emitted
+ * by future {@link ProcessElement} calls continuing processing of the current element.
+ *
+ * <p>A runner should treat an absent value as equivalent to the timestamp of the input element.
+ */
+ @Nullable
+ public abstract Instant getWatermark();
+
+ /** Builder method to set the value of {@link #resumeDelay()}. */
+ public ProcessContinuation withResumeDelay(Duration resumeDelay) {
+ return new AutoValue_DoFn_ProcessContinuation(
+ shouldResume(), resumeDelay, getWatermark());
+ }
+
+ /** Builder method to set the value of {@link #getWatermark()}. */
+ public ProcessContinuation withWatermark(Instant watermark) {
+ return new AutoValue_DoFn_ProcessContinuation(
+ shouldResume(), resumeDelay(), watermark);
+ }
+ }
+
+ /**
* Returns an {@link Aggregator} with aggregation logic specified by the
* {@link CombineFn} argument. The name provided must be unique across
* {@link Aggregator}s created within the {@link DoFn}. Aggregators can only be created