You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 17:09:40 UTC
[13/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 4b5d5f5..171171f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -45,19 +45,14 @@ import org.apache.beam.sdk.coders.CoderProviders;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.LargeKeys;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.testing.TestStream;
-import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
-import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -189,40 +184,6 @@ public class GroupByKeyTest {
p.run();
}
- /**
- * Tests that when a processing time timers comes in after a window is expired it does not cause a
- * spurious output.
- */
- @Test
- @Category({ValidatesRunner.class, UsesTestStream.class})
- public void testCombiningAccumulatingProcessingTime() throws Exception {
- PCollection<Integer> triggeredSums =
- p.apply(
- TestStream.create(VarIntCoder.of())
- .advanceWatermarkTo(new Instant(0))
- .addElements(
- TimestampedValue.of(2, new Instant(2)),
- TimestampedValue.of(5, new Instant(5)))
- .advanceWatermarkTo(new Instant(100))
- .advanceProcessingTime(Duration.millis(10))
- .advanceWatermarkToInfinity())
- .apply(
- Window.<Integer>into(FixedWindows.of(Duration.millis(100)))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)
- .accumulatingFiredPanes()
- .withAllowedLateness(Duration.ZERO)
- .triggering(
- Repeatedly.forever(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.millis(10)))))
- .apply(Sum.integersGlobally().withoutDefaults());
-
- PAssert.that(triggeredSums)
- .containsInAnyOrder(7);
-
- p.run();
- }
-
@Test
public void testGroupByKeyNonDeterministic() throws Exception {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index fa4949e..c67cf2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -62,8 +62,6 @@ import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.MapState;
@@ -1593,108 +1591,6 @@ public class ParDoTest implements Serializable {
}
@Test
- public void testStateNotKeyed() {
- final String stateId = "foo";
-
- DoFn<String, Integer> fn =
- new DoFn<String, Integer>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value();
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
- };
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("state");
- thrown.expectMessage("KvCoder");
-
- pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
- }
-
- @Test
- public void testStateNotDeterministic() {
- final String stateId = "foo";
-
- // DoubleCoder is not deterministic, so this should crash
- DoFn<KV<Double, String>, Integer> fn =
- new DoFn<KV<Double, String>, Integer>() {
-
- @StateId(stateId)
- private final StateSpec<ValueState<Integer>> intState =
- StateSpecs.value();
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @StateId(stateId) ValueState<Integer> state) {}
- };
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("state");
- thrown.expectMessage("deterministic");
-
- pipeline
- .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
- .apply(ParDo.of(fn));
- }
-
- @Test
- public void testTimerNotKeyed() {
- final String timerId = "foo";
-
- DoFn<String, Integer> fn =
- new DoFn<String, Integer>() {
-
- @TimerId(timerId)
- private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @TimerId(timerId) Timer timer) {}
-
- @OnTimer(timerId)
- public void onTimer() {}
- };
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("timer");
- thrown.expectMessage("KvCoder");
-
- pipeline.apply(Create.of("hello", "goodbye", "hello again")).apply(ParDo.of(fn));
- }
-
- @Test
- public void testTimerNotDeterministic() {
- final String timerId = "foo";
-
- // DoubleCoder is not deterministic, so this should crash
- DoFn<KV<Double, String>, Integer> fn =
- new DoFn<KV<Double, String>, Integer>() {
-
- @TimerId(timerId)
- private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @ProcessElement
- public void processElement(
- ProcessContext c, @TimerId(timerId) Timer timer) {}
-
- @OnTimer(timerId)
- public void onTimer() {}
- };
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("timer");
- thrown.expectMessage("deterministic");
-
- pipeline
- .apply(Create.of(KV.of(1.0, "hello"), KV.of(5.4, "goodbye"), KV.of(7.2, "hello again")))
- .apply(ParDo.of(fn));
- }
-
- @Test
@Category({ValidatesRunner.class, UsesStatefulParDo.class})
public void testValueStateCoderInference() {
final String stateId = "foo";
@@ -3046,65 +2942,4 @@ public class ParDoTest implements Serializable {
// If it doesn't crash, we made it!
}
-
- /** A {@link PipelineOptions} subclass for testing passing to a {@link DoFn}. */
- public interface MyOptions extends PipelineOptions {
- @Default.String("fake option")
- String getFakeOption();
- void setFakeOption(String value);
- }
-
- @Test
- @Category(ValidatesRunner.class)
- public void testPipelineOptionsParameter() {
- PCollection<String> results = pipeline
- .apply(Create.of(1))
- .apply(
- ParDo.of(
- new DoFn<Integer, String>() {
- @ProcessElement
- public void process(ProcessContext c, PipelineOptions options) {
- c.output(options.as(MyOptions.class).getFakeOption());
- }
- }));
-
- String testOptionValue = "not fake anymore";
- pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
- PAssert.that(results).containsInAnyOrder("not fake anymore");
-
- pipeline.run();
- }
-
- @Test
- @Category({ValidatesRunner.class, UsesTimersInParDo.class})
- public void testPipelineOptionsParameterOnTimer() {
- final String timerId = "thisTimer";
-
- PCollection<String> results =
- pipeline
- .apply(Create.of(KV.of(0, 0)))
- .apply(
- ParDo.of(
- new DoFn<KV<Integer, Integer>, String>() {
- @TimerId(timerId)
- private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
- @ProcessElement
- public void process(
- ProcessContext c, BoundedWindow w, @TimerId(timerId) Timer timer) {
- timer.set(w.maxTimestamp());
- }
-
- @OnTimer(timerId)
- public void onTimer(OnTimerContext c, PipelineOptions options) {
- c.output(options.as(MyOptions.class).getFakeOption());
- }
- }));
-
- String testOptionValue = "not fake anymore";
- pipeline.getOptions().as(MyOptions.class).setFakeOption(testOptionValue);
- PAssert.that(results).containsInAnyOrder("not fake anymore");
-
- pipeline.run();
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index d2d2529..02a44d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -18,14 +18,10 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkState;
-import static org.apache.beam.sdk.testing.TestPipeline.testingPipelineOptions;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import com.google.common.collect.Ordering;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
@@ -33,9 +29,6 @@ import java.util.List;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.range.OffsetRange;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
@@ -44,6 +37,7 @@ import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement;
+import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -73,16 +67,10 @@ public class SplittableDoFnTest implements Serializable {
static class PairStringWithIndexToLength extends DoFn<String, KV<String, Integer>> {
@ProcessElement
- public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) {
- for (long i = tracker.currentRestriction().getFrom(), numIterations = 0;
- tracker.tryClaim(i);
- ++i, ++numIterations) {
+ public void process(ProcessContext c, OffsetRangeTracker tracker) {
+ for (long i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) {
c.output(KV.of(c.element(), (int) i));
- if (numIterations % 3 == 0) {
- return resume();
- }
}
- return stop();
}
@GetInitialRestriction
@@ -105,25 +93,8 @@ public class SplittableDoFnTest implements Serializable {
}
}
- private static PipelineOptions streamingTestPipelineOptions() {
- // Using testing options with streaming=true makes it possible to enable UsesSplittableParDo
- // tests in Dataflow runner, because as of writing, it can run Splittable DoFn only in
- // streaming mode.
- // This is a no-op for other runners currently (Direct runner doesn't care, and other
- // runners don't implement SDF at all yet).
- //
- // This is a workaround until https://issues.apache.org/jira/browse/BEAM-1620
- // is properly implemented and supports marking tests as streaming-only.
- //
- // https://issues.apache.org/jira/browse/BEAM-2483 specifically tracks the removal of the
- // current workaround.
- PipelineOptions options = testingPipelineOptions();
- options.as(StreamingOptions.class).setStreaming(true);
- return options;
- }
-
@Rule
- public final transient TestPipeline p = TestPipeline.fromOptions(streamingTestPipelineOptions());
+ public final transient TestPipeline p = TestPipeline.create();
@Test
@Category({ValidatesRunner.class, UsesSplittableParDo.class})
@@ -211,12 +182,6 @@ public class SplittableDoFnTest implements Serializable {
private static class SDFWithMultipleOutputsPerBlock extends DoFn<String, Integer> {
private static final int MAX_INDEX = 98765;
- private final int numClaimsPerCall;
-
- private SDFWithMultipleOutputsPerBlock(int numClaimsPerCall) {
- this.numClaimsPerCall = numClaimsPerCall;
- }
-
private static int snapToNextBlock(int index, int[] blockStarts) {
for (int i = 1; i < blockStarts.length; ++i) {
if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
@@ -227,20 +192,14 @@ public class SplittableDoFnTest implements Serializable {
}
@ProcessElement
- public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
+ public void processElement(ProcessContext c, OffsetRangeTracker tracker) {
int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
- for (int i = trueStart, numIterations = 1;
- tracker.tryClaim(blockStarts[i]);
- ++i, ++numIterations) {
+ for (int i = trueStart; tracker.tryClaim(blockStarts[i]); ++i) {
for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
c.output(index);
}
- if (numIterations == numClaimsPerCall) {
- return resume();
- }
}
- return stop();
}
@GetInitialRestriction
@@ -253,7 +212,7 @@ public class SplittableDoFnTest implements Serializable {
@Category({ValidatesRunner.class, UsesSplittableParDo.class})
public void testOutputAfterCheckpoint() throws Exception {
PCollection<Integer> outputs = p.apply(Create.of("foo"))
- .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock(3)));
+ .apply(ParDo.of(new SDFWithMultipleOutputsPerBlock()));
PAssert.thatSingleton(outputs.apply(Count.<Integer>globally()))
.isEqualTo((long) SDFWithMultipleOutputsPerBlock.MAX_INDEX);
p.run();
@@ -328,105 +287,9 @@ public class SplittableDoFnTest implements Serializable {
PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7");
p.run();
- }
-
- @BoundedPerElement
- private static class SDFWithMultipleOutputsPerBlockAndSideInput
- extends DoFn<Integer, KV<String, Integer>> {
- private static final int MAX_INDEX = 98765;
- private final PCollectionView<String> sideInput;
- private final int numClaimsPerCall;
-
- public SDFWithMultipleOutputsPerBlockAndSideInput(
- PCollectionView<String> sideInput, int numClaimsPerCall) {
- this.sideInput = sideInput;
- this.numClaimsPerCall = numClaimsPerCall;
- }
-
- private static int snapToNextBlock(int index, int[] blockStarts) {
- for (int i = 1; i < blockStarts.length; ++i) {
- if (index > blockStarts[i - 1] && index <= blockStarts[i]) {
- return i;
- }
- }
- throw new IllegalStateException("Shouldn't get here");
- }
-
- @ProcessElement
- public ProcessContinuation processElement(ProcessContext c, OffsetRangeTracker tracker) {
- int[] blockStarts = {-1, 0, 12, 123, 1234, 12345, 34567, MAX_INDEX};
- int trueStart = snapToNextBlock((int) tracker.currentRestriction().getFrom(), blockStarts);
- for (int i = trueStart, numIterations = 1;
- tracker.tryClaim(blockStarts[i]);
- ++i, ++numIterations) {
- for (int index = blockStarts[i]; index < blockStarts[i + 1]; ++index) {
- c.output(KV.of(c.sideInput(sideInput) + ":" + c.element(), index));
- }
- if (numIterations == numClaimsPerCall) {
- return resume();
- }
- }
- return stop();
- }
-
- @GetInitialRestriction
- public OffsetRange getInitialRange(Integer element) {
- return new OffsetRange(0, MAX_INDEX);
- }
- }
-
- @Test
- @Category({
- ValidatesRunner.class,
- UsesSplittableParDo.class,
- UsesSplittableParDoWithWindowedSideInputs.class
- })
- public void testWindowedSideInputWithCheckpoints() throws Exception {
- PCollection<Integer> mainInput =
- p.apply("main",
- Create.timestamped(
- TimestampedValue.of(0, new Instant(0)),
- TimestampedValue.of(1, new Instant(1)),
- TimestampedValue.of(2, new Instant(2)),
- TimestampedValue.of(3, new Instant(3))))
- .apply("window 1", Window.<Integer>into(FixedWindows.of(Duration.millis(1))));
-
- PCollectionView<String> sideInput =
- p.apply("side",
- Create.timestamped(
- TimestampedValue.of("a", new Instant(0)),
- TimestampedValue.of("b", new Instant(2))))
- .apply("window 2", Window.<String>into(FixedWindows.of(Duration.millis(2))))
- .apply("singleton", View.<String>asSingleton());
-
- PCollection<KV<String, Integer>> res =
- mainInput.apply(
- ParDo.of(
- new SDFWithMultipleOutputsPerBlockAndSideInput(
- sideInput, 3 /* numClaimsPerCall */))
- .withSideInputs(sideInput));
- PCollection<KV<String, Iterable<Integer>>> grouped =
- res.apply(GroupByKey.<String, Integer>create());
-
- PAssert.that(grouped.apply(Keys.<String>create()))
- .containsInAnyOrder("a:0", "a:1", "b:2", "b:3");
- PAssert.that(grouped)
- .satisfies(
- new SerializableFunction<Iterable<KV<String, Iterable<Integer>>>, Void>() {
- @Override
- public Void apply(Iterable<KV<String, Iterable<Integer>>> input) {
- List<Integer> expected = new ArrayList<>();
- for (int i = 0; i < SDFWithMultipleOutputsPerBlockAndSideInput.MAX_INDEX; ++i) {
- expected.add(i);
- }
- for (KV<String, Iterable<Integer>> kv : input) {
- assertEquals(expected, Ordering.<Integer>natural().sortedCopy(kv.getValue()));
- }
- return null;
- }
- });
- p.run();
+ // TODO: also add test coverage when the SDF checkpoints - the resumed call should also
+ // properly access side inputs.
// TODO: also test coverage when some of the windows of the side input are not ready.
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
index 2098c66..3edb194 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.transforms.reflect;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
-import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
@@ -91,8 +89,8 @@ public class DoFnInvokersTest {
when(mockArgumentProvider.processContext(Matchers.<DoFn>any())).thenReturn(mockProcessContext);
}
- private DoFn.ProcessContinuation invokeProcessElement(DoFn<String, String> fn) {
- return DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
+ private void invokeProcessElement(DoFn<String, String> fn) {
+ DoFnInvokers.invokerFor(fn).invokeProcessElement(mockArgumentProvider);
}
private void invokeOnTimer(String timerId, DoFn<String, String> fn) {
@@ -121,7 +119,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c) throws Exception {}
}
MockFn mockFn = mock(MockFn.class);
- assertEquals(stop(), invokeProcessElement(mockFn));
+ invokeProcessElement(mockFn);
verify(mockFn).processElement(mockProcessContext);
}
@@ -142,7 +140,7 @@ public class DoFnInvokersTest {
public void testDoFnWithProcessElementInterface() throws Exception {
IdentityUsingInterfaceWithProcessElement fn =
mock(IdentityUsingInterfaceWithProcessElement.class);
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext);
}
@@ -163,14 +161,14 @@ public class DoFnInvokersTest {
@Test
public void testDoFnWithMethodInSuperclass() throws Exception {
IdentityChildWithoutOverride fn = mock(IdentityChildWithoutOverride.class);
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).process(mockProcessContext);
}
@Test
public void testDoFnWithMethodInSubclass() throws Exception {
IdentityChildWithOverride fn = mock(IdentityChildWithOverride.class);
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).process(mockProcessContext);
}
@@ -181,7 +179,7 @@ public class DoFnInvokersTest {
public void processElement(ProcessContext c, IntervalWindow w) throws Exception {}
}
MockFn fn = mock(MockFn.class);
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext, mockWindow);
}
@@ -205,7 +203,7 @@ public class DoFnInvokersTest {
throws Exception {}
}
MockFn fn = mock(MockFn.class);
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext, mockState);
}
@@ -231,35 +229,11 @@ public class DoFnInvokersTest {
public void onTimer() {}
}
MockFn fn = mock(MockFn.class);
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processElement(mockProcessContext, mockTimer);
}
@Test
- public void testDoFnWithReturn() throws Exception {
- class MockFn extends DoFn<String, String> {
- @DoFn.ProcessElement
- public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker)
- throws Exception {
- return null;
- }
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(String element) {
- return null;
- }
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
- }
- MockFn fn = mock(MockFn.class);
- when(fn.processElement(mockProcessContext, null)).thenReturn(resume());
- assertEquals(resume(), invokeProcessElement(fn));
- }
-
- @Test
public void testDoFnWithStartBundleSetupTeardown() throws Exception {
class MockFn extends DoFn<String, String> {
@ProcessElement
@@ -314,9 +288,7 @@ public class DoFnInvokersTest {
/** Public so Mockito can do "delegatesTo()" in the test below. */
public static class MockFn extends DoFn<String, String> {
@ProcessElement
- public ProcessContinuation processElement(ProcessContext c, SomeRestrictionTracker tracker) {
- return null;
- }
+ public void processElement(ProcessContext c, SomeRestrictionTracker tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(String element) {
@@ -368,7 +340,7 @@ public class DoFnInvokersTest {
.splitRestriction(
eq("blah"), same(restriction), Mockito.<DoFn.OutputReceiver<SomeRestriction>>any());
when(fn.newTracker(restriction)).thenReturn(tracker);
- when(fn.processElement(mockProcessContext, tracker)).thenReturn(resume());
+ fn.processElement(mockProcessContext, tracker);
assertEquals(coder, invoker.invokeGetRestrictionCoder(CoderRegistry.createDefault()));
assertEquals(restriction, invoker.invokeGetInitialRestriction("blah"));
@@ -384,8 +356,6 @@ public class DoFnInvokersTest {
});
assertEquals(Arrays.asList(part1, part2, part3), outputs);
assertEquals(tracker, invoker.invokeNewTracker(restriction));
- assertEquals(
- resume(),
invoker.invokeProcessElement(
new FakeArgumentProvider<String, String>() {
@Override
@@ -397,7 +367,7 @@ public class DoFnInvokersTest {
public RestrictionTracker<?> restrictionTracker() {
return tracker;
}
- }));
+ });
}
private static class RestrictionWithDefaultTracker
@@ -471,7 +441,7 @@ public class DoFnInvokersTest {
assertEquals("foo", output);
}
});
- assertEquals(stop(), invoker.invokeProcessElement(mockArgumentProvider));
+ invoker.invokeProcessElement(mockArgumentProvider);
assertThat(
invoker.invokeNewTracker(new RestrictionWithDefaultTracker()),
instanceOf(DefaultTracker.class));
@@ -561,14 +531,14 @@ public class DoFnInvokersTest {
@Test
public void testLocalPrivateDoFnClass() throws Exception {
PrivateDoFnClass fn = mock(PrivateDoFnClass.class);
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
verify(fn).processThis(mockProcessContext);
}
@Test
public void testStaticPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPackagePrivateDoFn().getClass());
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyStaticPackagePrivateDoFn(fn, mockProcessContext);
}
@@ -576,28 +546,28 @@ public class DoFnInvokersTest {
public void testInnerPackagePrivateDoFnClass() throws Exception {
DoFn<String, String> fn =
mock(new DoFnInvokersTestHelper().newInnerPackagePrivateDoFn().getClass());
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyInnerPackagePrivateDoFn(fn, mockProcessContext);
}
@Test
public void testStaticPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(DoFnInvokersTestHelper.newStaticPrivateDoFn().getClass());
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyStaticPrivateDoFn(fn, mockProcessContext);
}
@Test
public void testInnerPrivateDoFnClass() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerPrivateDoFn().getClass());
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyInnerPrivateDoFn(fn, mockProcessContext);
}
@Test
public void testAnonymousInnerDoFn() throws Exception {
DoFn<String, String> fn = mock(new DoFnInvokersTestHelper().newInnerAnonymousDoFn().getClass());
- assertEquals(stop(), invokeProcessElement(fn));
+ invokeProcessElement(fn);
DoFnInvokersTestHelper.verifyInnerAnonymousDoFn(fn, mockProcessContext);
}
@@ -634,31 +604,6 @@ public class DoFnInvokersTest {
}
@Test
- public void testProcessElementExceptionWithReturn() throws Exception {
- thrown.expect(UserCodeException.class);
- thrown.expectMessage("bogus");
- DoFnInvokers.invokerFor(
- new DoFn<Integer, Integer>() {
- @ProcessElement
- public ProcessContinuation processElement(
- @SuppressWarnings("unused") ProcessContext c, SomeRestrictionTracker tracker) {
- throw new IllegalArgumentException("bogus");
- }
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(Integer element) {
- return null;
- }
-
- @NewTracker
- public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
- return null;
- }
- })
- .invokeProcessElement(new FakeArgumentProvider<Integer, Integer>());
- }
-
- @Test
public void testStartBundleException() throws Exception {
DoFnInvoker<Integer, Integer> invoker =
DoFnInvokers.invokerFor(
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
index 44ae5c4..d321f54 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesProcessElementTest.java
@@ -50,7 +50,7 @@ public class DoFnSignaturesProcessElementTest {
@Test
public void testBadReturnType() throws Exception {
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Must return void or ProcessContinuation");
+ thrown.expectMessage("Must return void");
analyzeProcessElementMethod(
new AnonymousMethod() {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
index 08af65e..07b3348 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesSplittableDoFnTest.java
@@ -52,8 +52,7 @@ import org.junit.runners.JUnit4;
public class DoFnSignaturesSplittableDoFnTest {
@Rule public ExpectedException thrown = ExpectedException.none();
- private abstract static class SomeRestriction
- implements HasDefaultTracker<SomeRestriction, SomeRestrictionTracker> {}
+ private static class SomeRestriction {}
private abstract static class SomeRestrictionTracker
implements RestrictionTracker<SomeRestriction> {}
@@ -61,20 +60,6 @@ public class DoFnSignaturesSplittableDoFnTest {
private abstract static class SomeRestrictionCoder extends StructuredCoder<SomeRestriction> {}
@Test
- public void testReturnsProcessContinuation() throws Exception {
- DoFnSignature.ProcessElementMethod signature =
- analyzeProcessElementMethod(
- new AnonymousMethod() {
- private DoFn.ProcessContinuation method(
- DoFn<Integer, String>.ProcessContext context) {
- return null;
- }
- });
-
- assertTrue(signature.hasReturnValue());
- }
-
- @Test
public void testHasRestrictionTracker() throws Exception {
DoFnSignature.ProcessElementMethod signature =
analyzeProcessElementMethod(
@@ -115,6 +100,11 @@ public class DoFnSignaturesSplittableDoFnTest {
public SomeRestriction getInitialRestriction(Integer element) {
return null;
}
+
+ @NewTracker
+ public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+ return null;
+ }
}
@BoundedPerElement
@@ -140,55 +130,6 @@ public class DoFnSignaturesSplittableDoFnTest {
.isBoundedPerElement());
}
- private static class BaseFnWithoutContinuation extends DoFn<Integer, String> {
- @ProcessElement
- public void processElement(ProcessContext context, SomeRestrictionTracker tracker) {}
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(Integer element) {
- return null;
- }
- }
-
- private static class BaseFnWithContinuation extends DoFn<Integer, String> {
- @ProcessElement
- public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
- return null;
- }
-
- @GetInitialRestriction
- public SomeRestriction getInitialRestriction(Integer element) {
- return null;
- }
- }
-
- @Test
- public void testSplittableBoundednessInferredFromReturnValue() throws Exception {
- assertEquals(
- PCollection.IsBounded.BOUNDED,
- DoFnSignatures.getSignature(BaseFnWithoutContinuation.class).isBoundedPerElement());
- assertEquals(
- PCollection.IsBounded.UNBOUNDED,
- DoFnSignatures.getSignature(BaseFnWithContinuation.class).isBoundedPerElement());
- }
-
- @Test
- public void testSplittableRespectsBoundednessAnnotation() throws Exception {
- @BoundedPerElement
- class BoundedFnWithContinuation extends BaseFnWithContinuation {}
-
- assertEquals(
- PCollection.IsBounded.BOUNDED,
- DoFnSignatures.getSignature(BoundedFnWithContinuation.class).isBoundedPerElement());
-
- @UnboundedPerElement
- class UnboundedFnWithContinuation extends BaseFnWithContinuation {}
-
- assertEquals(
- PCollection.IsBounded.UNBOUNDED,
- DoFnSignatures.getSignature(UnboundedFnWithContinuation.class).isBoundedPerElement());
- }
@Test
public void testUnsplittableIsBounded() throws Exception {
class UnsplittableFn extends DoFn<Integer, String> {
@@ -231,10 +172,8 @@ public class DoFnSignaturesSplittableDoFnTest {
public void testSplittableWithAllFunctions() throws Exception {
class GoodSplittableDoFn extends DoFn<Integer, String> {
@ProcessElement
- public ProcessContinuation processElement(
- ProcessContext context, SomeRestrictionTracker tracker) {
- return null;
- }
+ public void processElement(
+ ProcessContext context, SomeRestrictionTracker tracker) {}
@GetInitialRestriction
public SomeRestriction getInitialRestriction(Integer element) {
@@ -259,7 +198,6 @@ public class DoFnSignaturesSplittableDoFnTest {
DoFnSignature signature = DoFnSignatures.getSignature(GoodSplittableDoFn.class);
assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
- assertTrue(signature.processElement().hasReturnValue());
assertEquals(
SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
@@ -276,9 +214,7 @@ public class DoFnSignaturesSplittableDoFnTest {
public void testSplittableWithAllFunctionsGeneric() throws Exception {
class GoodGenericSplittableDoFn<RestrictionT, TrackerT, CoderT> extends DoFn<Integer, String> {
@ProcessElement
- public ProcessContinuation processElement(ProcessContext context, TrackerT tracker) {
- return null;
- }
+ public void processElement(ProcessContext context, TrackerT tracker) {}
@GetInitialRestriction
public RestrictionT getInitialRestriction(Integer element) {
@@ -306,7 +242,6 @@ public class DoFnSignaturesSplittableDoFnTest {
SomeRestriction, SomeRestrictionTracker, SomeRestrictionCoder>() {}.getClass());
assertEquals(SomeRestrictionTracker.class, signature.processElement().trackerT().getRawType());
assertTrue(signature.processElement().isSplittable());
- assertTrue(signature.processElement().hasReturnValue());
assertEquals(
SomeRestriction.class, signature.getInitialRestriction().restrictionT().getRawType());
assertEquals(SomeRestriction.class, signature.splitRestriction().restrictionT().getRawType());
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
index 70c8dfd..cffb0ad 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
@@ -29,7 +29,6 @@ import static org.junit.Assert.fail;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
@@ -330,19 +329,6 @@ public class DoFnSignaturesTest {
}
@Test
- public void testPipelineOptionsParameter() throws Exception {
- DoFnSignature sig =
- DoFnSignatures.getSignature(new DoFn<String, String>() {
- @ProcessElement
- public void process(ProcessContext c, PipelineOptions options) {}
- }.getClass());
-
- assertThat(
- sig.processElement().extraParameters(),
- Matchers.<Parameter>hasItem(instanceOf(Parameter.PipelineOptionsParameter.class)));
- }
-
- @Test
public void testDeclAndUsageOfTimerInSuperclass() throws Exception {
DoFnSignature sig =
DoFnSignatures.getSignature(new DoFnOverridingAbstractTimerUse().getClass());
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
index 8aed6b9..831894c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import org.apache.beam.sdk.io.range.OffsetRange;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
index bfd01f0..b14e221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
@@ -21,7 +21,6 @@ import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn;
import static org.apache.beam.sdk.testing.WindowFnTestUtils.set;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
@@ -56,12 +55,11 @@ public class SlidingWindowsTest {
expected.put(new IntervalWindow(new Instant(0), new Instant(10)), set(1, 2, 5, 9));
expected.put(new IntervalWindow(new Instant(5), new Instant(15)), set(5, 9, 10, 11));
expected.put(new IntervalWindow(new Instant(10), new Instant(20)), set(10, 11));
- SlidingWindows windowFn = SlidingWindows.of(new Duration(10)).every(new Duration(5));
assertEquals(
expected,
- runWindowFn(windowFn,
+ runWindowFn(
+ SlidingWindows.of(new Duration(10)).every(new Duration(5)),
Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
- assertThat(windowFn.assignsToOneWindow(), is(false));
}
@Test
@@ -71,27 +69,11 @@ public class SlidingWindowsTest {
expected.put(new IntervalWindow(new Instant(0), new Instant(7)), set(1, 2, 5));
expected.put(new IntervalWindow(new Instant(5), new Instant(12)), set(5, 9, 10, 11));
expected.put(new IntervalWindow(new Instant(10), new Instant(17)), set(10, 11));
- SlidingWindows windowFn = SlidingWindows.of(new Duration(7)).every(new Duration(5));
- assertEquals(
- expected,
- runWindowFn(windowFn,
- Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
- assertThat(windowFn.assignsToOneWindow(), is(false));
- }
-
- @Test
- public void testEqualSize() throws Exception {
- Map<IntervalWindow, Set<String>> expected = new HashMap<>();
- expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2));
- expected.put(new IntervalWindow(new Instant(3), new Instant(6)), set(3, 4, 5));
- expected.put(new IntervalWindow(new Instant(6), new Instant(9)), set(6, 7));
- SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(3));
assertEquals(
expected,
runWindowFn(
- windowFn,
- Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L)));
- assertThat(windowFn.assignsToOneWindow(), is(true));
+ SlidingWindows.of(new Duration(7)).every(new Duration(5)),
+ Arrays.asList(1L, 2L, 5L, 9L, 10L, 11L)));
}
@Test
@@ -100,14 +82,12 @@ public class SlidingWindowsTest {
expected.put(new IntervalWindow(new Instant(0), new Instant(3)), set(1, 2));
expected.put(new IntervalWindow(new Instant(10), new Instant(13)), set(10, 11));
expected.put(new IntervalWindow(new Instant(100), new Instant(103)), set(100));
- SlidingWindows windowFn = SlidingWindows.of(new Duration(3)).every(new Duration(10));
assertEquals(
expected,
runWindowFn(
// Only look at the first 3 millisecs of every 10-millisec interval.
- windowFn,
+ SlidingWindows.of(new Duration(3)).every(new Duration(10)),
Arrays.asList(1L, 2L, 3L, 5L, 9L, 10L, 11L, 100L)));
- assertThat(windowFn.assignsToOneWindow(), is(true));
}
@Test
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/pom.xml b/sdks/java/extensions/google-cloud-platform-core/pom.xml
index 7d54990..e4e951b 100644
--- a/sdks/java/extensions/google-cloud-platform-core/pom.xml
+++ b/sdks/java/extensions/google-cloud-platform-core/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index d7205bf..8d1fe74 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -135,7 +135,7 @@ public class GcsUtil {
private static final int MAX_CONCURRENT_BATCHES = 256;
private static final FluentBackoff BACKOFF_FACTORY =
- FluentBackoff.DEFAULT.withMaxRetries(10).withInitialBackoff(Duration.standardSeconds(1));
+ FluentBackoff.DEFAULT.withMaxRetries(3).withInitialBackoff(Duration.millis(200));
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
index fd908cf..e5b48d3 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/util/RetryHttpRequestInitializer.java
@@ -17,9 +17,8 @@
*/
package org.apache.beam.sdk.util;
-import static com.google.api.client.util.BackOffUtils.next;
-
-import com.google.api.client.http.HttpIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffIOExceptionHandler;
+import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.client.http.HttpResponse;
@@ -61,106 +60,64 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer {
*/
private static final int HANGING_GET_TIMEOUT_SEC = 80;
- /** Handlers used to provide additional logging information on unsuccessful HTTP requests. */
- private static class LoggingHttpBackOffHandler
- implements HttpIOExceptionHandler, HttpUnsuccessfulResponseHandler {
-
- private final Sleeper sleeper;
- private final BackOff ioExceptionBackOff;
- private final BackOff unsuccessfulResponseBackOff;
- private final Set<Integer> ignoredResponseCodes;
- private int ioExceptionRetries;
- private int unsuccessfulResponseRetries;
-
- private LoggingHttpBackOffHandler(
- Sleeper sleeper,
- BackOff ioExceptionBackOff,
- BackOff unsucessfulResponseBackOff,
- Set<Integer> ignoredResponseCodes) {
- this.sleeper = sleeper;
- this.ioExceptionBackOff = ioExceptionBackOff;
- this.unsuccessfulResponseBackOff = unsucessfulResponseBackOff;
- this.ignoredResponseCodes = ignoredResponseCodes;
+ private static class LoggingHttpBackOffIOExceptionHandler
+ extends HttpBackOffIOExceptionHandler {
+ public LoggingHttpBackOffIOExceptionHandler(BackOff backOff) {
+ super(backOff);
}
@Override
public boolean handleIOException(HttpRequest request, boolean supportsRetry)
throws IOException {
- // We will retry if the request supports retry or the backoff was successful.
- // Note that the order of these checks is important since
- // backOffWasSuccessful will perform a sleep.
- boolean willRetry = supportsRetry && backOffWasSuccessful(ioExceptionBackOff);
+ boolean willRetry = super.handleIOException(request, supportsRetry);
if (willRetry) {
- ioExceptionRetries += 1;
LOG.debug("Request failed with IOException, will retry: {}", request.getUrl());
} else {
- String message = "Request failed with IOException, "
- + "performed {} retries due to IOExceptions, "
- + "performed {} retries due to unsuccessful status codes, "
- + "HTTP framework says request {} be retried, "
- + "(caller responsible for retrying): {}";
- LOG.warn(message,
- ioExceptionRetries,
- unsuccessfulResponseRetries,
- supportsRetry ? "can" : "cannot",
+ LOG.warn(
+ "Request failed with IOException (caller responsible for retrying): {}",
request.getUrl());
}
return willRetry;
}
+ }
+
+ private static class LoggingHttpBackoffUnsuccessfulResponseHandler
+ implements HttpUnsuccessfulResponseHandler {
+ private final HttpBackOffUnsuccessfulResponseHandler handler;
+ private final Set<Integer> ignoredResponseCodes;
+
+ public LoggingHttpBackoffUnsuccessfulResponseHandler(BackOff backoff,
+ Sleeper sleeper, Set<Integer> ignoredResponseCodes) {
+ this.ignoredResponseCodes = ignoredResponseCodes;
+ handler = new HttpBackOffUnsuccessfulResponseHandler(backoff);
+ handler.setSleeper(sleeper);
+ handler.setBackOffRequired(
+ new HttpBackOffUnsuccessfulResponseHandler.BackOffRequired() {
+ @Override
+ public boolean isRequired(HttpResponse response) {
+ int statusCode = response.getStatusCode();
+ return (statusCode / 100 == 5) || // 5xx: server error
+ statusCode == 429; // 429: Too many requests
+ }
+ });
+ }
@Override
- public boolean handleResponse(HttpRequest request, HttpResponse response, boolean supportsRetry)
- throws IOException {
- // We will retry if the request supports retry and the status code requires a backoff
- // and the backoff was successful. Note that the order of these checks is important since
- // backOffWasSuccessful will perform a sleep.
- boolean willRetry = supportsRetry
- && retryOnStatusCode(response.getStatusCode())
- && backOffWasSuccessful(unsuccessfulResponseBackOff);
- if (willRetry) {
- unsuccessfulResponseRetries += 1;
+ public boolean handleResponse(HttpRequest request, HttpResponse response,
+ boolean supportsRetry) throws IOException {
+ boolean retry = handler.handleResponse(request, response, supportsRetry);
+ if (retry) {
LOG.debug("Request failed with code {}, will retry: {}",
response.getStatusCode(), request.getUrl());
- } else {
- String message = "Request failed with code {}, "
- + "performed {} retries due to IOExceptions, "
- + "performed {} retries due to unsuccessful status codes, "
- + "HTTP framework says request {} be retried, "
- + "(caller responsible for retrying): {}";
- if (ignoredResponseCodes.contains(response.getStatusCode())) {
- // Log ignored response codes at a lower level
- LOG.debug(message,
- response.getStatusCode(),
- ioExceptionRetries,
- unsuccessfulResponseRetries,
- supportsRetry ? "can" : "cannot",
- request.getUrl());
- } else {
- LOG.warn(message,
- response.getStatusCode(),
- ioExceptionRetries,
- unsuccessfulResponseRetries,
- supportsRetry ? "can" : "cannot",
- request.getUrl());
- }
- }
- return willRetry;
- }
- /** Returns true iff performing the backoff was successful. */
- private boolean backOffWasSuccessful(BackOff backOff) {
- try {
- return next(sleeper, backOff);
- } catch (InterruptedException | IOException e) {
- return false;
+ } else if (!ignoredResponseCodes.contains(response.getStatusCode())) {
+ LOG.warn(
+ "Request failed with code {} (caller responsible for retrying): {}",
+ response.getStatusCode(),
+ request.getUrl());
}
- }
- /** Returns true iff the {@code statusCode} represents an error that should be retried. */
- private boolean retryOnStatusCode(int statusCode) {
- return (statusCode == 0) // Code 0 usually means no response / network error
- || (statusCode / 100 == 5) // 5xx: server error
- || statusCode == 429; // 429: Too many requests
+ return retry;
}
}
@@ -216,20 +173,20 @@ public class RetryHttpRequestInitializer implements HttpRequestInitializer {
// TODO: Do this exclusively for work requests.
request.setReadTimeout(HANGING_GET_TIMEOUT_SEC * 1000);
- LoggingHttpBackOffHandler loggingHttpBackOffHandler = new LoggingHttpBackOffHandler(
- sleeper,
- // Retry immediately on IOExceptions.
- BackOff.ZERO_BACKOFF,
- // Back off on retryable http errors.
+ // Back off on retryable http errors.
+ request.setUnsuccessfulResponseHandler(
// A back-off multiplier of 2 raises the maximum request retrying time
// to approximately 5 minutes (keeping other back-off parameters to
// their default values).
- new ExponentialBackOff.Builder().setNanoClock(nanoClock).setMultiplier(2).build(),
- ignoredResponseCodes
- );
-
- request.setUnsuccessfulResponseHandler(loggingHttpBackOffHandler);
- request.setIOExceptionHandler(loggingHttpBackOffHandler);
+ new LoggingHttpBackoffUnsuccessfulResponseHandler(
+ new ExponentialBackOff.Builder().setNanoClock(nanoClock)
+ .setMultiplier(2).build(),
+ sleeper, ignoredResponseCodes));
+
+ // Retry immediately on IOExceptions.
+ LoggingHttpBackOffIOExceptionHandler loggingBackoffHandler =
+ new LoggingHttpBackOffIOExceptionHandler(BackOff.ZERO_BACKOFF);
+ request.setIOExceptionHandler(loggingBackoffHandler);
// Set response initializer
if (responseInterceptor != null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
index a0d9e4b..625c248 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/GcpCoreApiSurfaceTest.java
@@ -15,16 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.gcp;
+package org.apache.beam;
-import static org.apache.beam.sdk.util.ApiSurface.classesInPackage;
-import static org.apache.beam.sdk.util.ApiSurface.containsOnlyClassesMatching;
+import static org.apache.beam.sdk.util.ApiSurface.containsOnlyPackages;
import static org.hamcrest.MatcherAssert.assertThat;
import com.google.common.collect.ImmutableSet;
import java.util.Set;
import org.apache.beam.sdk.util.ApiSurface;
-import org.hamcrest.Matcher;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -34,32 +32,28 @@ import org.junit.runners.JUnit4;
public class GcpCoreApiSurfaceTest {
@Test
- public void testGcpCoreApiSurface() throws Exception {
- final Package thisPackage = getClass().getPackage();
- final ClassLoader thisClassLoader = getClass().getClassLoader();
- final ApiSurface apiSurface =
- ApiSurface.ofPackage(thisPackage, thisClassLoader)
- .pruningPattern("org[.]apache[.]beam[.].*Test.*")
- .pruningPattern("org[.]apache[.]beam[.].*IT")
- .pruningPattern("java[.]lang.*")
- .pruningPattern("java[.]util.*");
+ public void testApiSurface() throws Exception {
@SuppressWarnings("unchecked")
- final Set<Matcher<Class<?>>> allowedClasses =
+ final Set<String> allowed =
ImmutableSet.of(
- classesInPackage("com.google.api.client.googleapis"),
- classesInPackage("com.google.api.client.http"),
- classesInPackage("com.google.api.client.json"),
- classesInPackage("com.google.api.client.util"),
- classesInPackage("com.google.api.services.storage"),
- classesInPackage("com.google.auth"),
- classesInPackage("com.fasterxml.jackson.annotation"),
- classesInPackage("java"),
- classesInPackage("javax"),
- classesInPackage("org.apache.beam.sdk"),
- classesInPackage("org.joda.time")
- );
+ "org.apache.beam",
+ "com.google.api.client",
+ "com.google.api.services.storage",
+ "com.google.auth",
+ "com.fasterxml.jackson.annotation",
+ "com.fasterxml.jackson.core",
+ "com.fasterxml.jackson.databind",
+ "org.apache.avro",
+ "org.hamcrest",
+ // via DataflowMatchers
+ "org.codehaus.jackson",
+ // via Avro
+ "org.joda.time",
+ "org.junit",
+ "sun.reflect");
- assertThat(apiSurface, containsOnlyClassesMatching(allowedClasses));
+ assertThat(
+ ApiSurface.getSdkApiSurface(getClass().getClassLoader()), containsOnlyPackages(allowed));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
index 13a9309..37551a4 100644
--- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
+++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/util/RetryHttpRequestInitializerTest.java
@@ -49,11 +49,10 @@ import java.net.SocketTimeoutException;
import java.security.PrivateKey;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.sdk.testing.ExpectedLogs;
import org.hamcrest.Matchers;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -68,8 +67,6 @@ import org.mockito.stubbing.Answer;
@RunWith(JUnit4.class)
public class RetryHttpRequestInitializerTest {
- @Rule public ExpectedLogs expectedLogs = ExpectedLogs.none(RetryHttpRequestInitializer.class);
-
@Mock private PrivateKey mockPrivateKey;
@Mock private LowLevelHttpRequest mockLowLevelRequest;
@Mock private LowLevelHttpResponse mockLowLevelResponse;
@@ -138,7 +135,6 @@ public class RetryHttpRequestInitializerTest {
verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
verify(mockLowLevelRequest).execute();
verify(mockLowLevelResponse).getStatusCode();
- expectedLogs.verifyNotLogged("Request failed");
}
/**
@@ -157,7 +153,7 @@ public class RetryHttpRequestInitializerTest {
HttpResponse response = result.executeUnparsed();
assertNotNull(response);
} catch (HttpResponseException e) {
- assertThat(e.getMessage(), Matchers.containsString("403"));
+ Assert.assertThat(e.getMessage(), Matchers.containsString("403"));
}
verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
@@ -166,7 +162,6 @@ public class RetryHttpRequestInitializerTest {
verify(mockLowLevelRequest).setTimeout(anyInt(), anyInt());
verify(mockLowLevelRequest).execute();
verify(mockLowLevelResponse).getStatusCode();
- expectedLogs.verifyWarn("Request failed with code 403");
}
/**
@@ -193,7 +188,6 @@ public class RetryHttpRequestInitializerTest {
verify(mockLowLevelRequest, times(3)).setTimeout(anyInt(), anyInt());
verify(mockLowLevelRequest, times(3)).execute();
verify(mockLowLevelResponse, times(3)).getStatusCode();
- expectedLogs.verifyDebug("Request failed with code 503");
}
/**
@@ -217,7 +211,6 @@ public class RetryHttpRequestInitializerTest {
verify(mockLowLevelRequest, times(2)).setTimeout(anyInt(), anyInt());
verify(mockLowLevelRequest, times(2)).execute();
verify(mockLowLevelResponse).getStatusCode();
- expectedLogs.verifyDebug("Request failed with IOException");
}
/**
@@ -231,22 +224,19 @@ public class RetryHttpRequestInitializerTest {
int n = 0;
@Override
public Integer answer(InvocationOnMock invocation) {
- return n++ < retries ? 503 : 9999;
+ return (n++ < retries - 1) ? 503 : 200;
}});
Storage.Buckets.Get result = storage.buckets().get("test");
- try {
- result.executeUnparsed();
- fail();
- } catch (Throwable t) {
- }
+ HttpResponse response = result.executeUnparsed();
+ assertNotNull(response);
verify(mockHttpResponseInterceptor).interceptResponse(any(HttpResponse.class));
- verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(), anyString());
- verify(mockLowLevelRequest, times(retries + 1)).setTimeout(anyInt(), anyInt());
- verify(mockLowLevelRequest, times(retries + 1)).execute();
- verify(mockLowLevelResponse, times(retries + 1)).getStatusCode();
- expectedLogs.verifyWarn("performed 10 retries due to unsuccessful status codes");
+ verify(mockLowLevelRequest, atLeastOnce()).addHeader(anyString(),
+ anyString());
+ verify(mockLowLevelRequest, times(retries)).setTimeout(anyInt(), anyInt());
+ verify(mockLowLevelRequest, times(retries)).execute();
+ verify(mockLowLevelResponse, times(retries)).getStatusCode();
}
/**
@@ -286,7 +276,6 @@ public class RetryHttpRequestInitializerTest {
} catch (Throwable e) {
assertThat(e, Matchers.<Throwable>instanceOf(SocketTimeoutException.class));
assertEquals(1 + defaultNumberOfRetries, executeCount.get());
- expectedLogs.verifyWarn("performed 10 retries due to IOExceptions");
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/jackson/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/jackson/pom.xml b/sdks/java/extensions/jackson/pom.xml
index 7fd38e0..4b09c11 100644
--- a/sdks/java/extensions/jackson/pom.xml
+++ b/sdks/java/extensions/jackson/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/join-library/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/pom.xml b/sdks/java/extensions/join-library/pom.xml
index ea24b75..556ec40 100644
--- a/sdks/java/extensions/join-library/pom.xml
+++ b/sdks/java/extensions/join-library/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 1222476..3d63626 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/protobuf/pom.xml b/sdks/java/extensions/protobuf/pom.xml
index 63855f8..ae909ab 100644
--- a/sdks/java/extensions/protobuf/pom.xml
+++ b/sdks/java/extensions/protobuf/pom.xml
@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
index 395c73f..9d25f9d 100644
--- a/sdks/java/extensions/sorter/pom.xml
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -22,13 +22,17 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-extensions-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>beam-sdks-java-extensions-sorter</artifactId>
<name>Apache Beam :: SDKs :: Java :: Extensions :: Sorter</name>
+ <properties>
+ <hadoop.version>2.7.1</hadoop.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
@@ -38,12 +42,14 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/harness/pom.xml b/sdks/java/harness/pom.xml
index fe5c2f1..61a170a 100644
--- a/sdks/java/harness/pom.xml
+++ b/sdks/java/harness/pom.xml
@@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-parent</artifactId>
- <version>2.2.0-SNAPSHOT</version>
+ <version>2.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -83,11 +83,6 @@
<dependency>
<groupId>org.apache.beam</groupId>
- <artifactId>beam-runners-core-construction-java</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
</dependency>
@@ -155,21 +150,10 @@
</dependency>
<dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
-
- <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
- <dependency>
- <groupId>com.google.auto.service</groupId>
- <artifactId>auto-service</artifactId>
- <optional>true</optional>
- </dependency>
-
<!-- test dependencies -->
<dependency>
<groupId>org.hamcrest</groupId>
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
index 2a9cef8..e33277a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java
@@ -18,111 +18,82 @@
package org.apache.beam.fn.harness.control;
-import com.google.common.annotations.VisibleForTesting;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.collect.Iterables.getOnlyElement;
+
+import com.google.common.collect.Collections2;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.ServiceLoader;
-import java.util.Set;
+import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
+import org.apache.beam.fn.harness.fake.FakeStepContext;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
-import org.apache.beam.runners.core.PTransformRunnerFactory;
-import org.apache.beam.runners.core.PTransformRunnerFactory.Registrar;
+import org.apache.beam.runners.core.BeamFnDataReadRunner;
+import org.apache.beam.runners.core.BeamFnDataWriteRunner;
+import org.apache.beam.runners.core.BoundedSourceRunner;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.runners.core.NullSideInputReader;
+import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.ReflectHelpers;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Processes {@link org.apache.beam.fn.v1.BeamFnApi.ProcessBundleRequest}s by materializing
- * the set of required runners for each {@link RunnerApi.FunctionSpec},
+ * the set of required runners for each {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec},
* wiring them together based upon the {@code input} and {@code output} map definitions.
*
* <p>Finally executes the DAG based graph by starting all runners in reverse topological order,
* and finishing all runners in forward topological order.
*/
public class ProcessBundleHandler {
-
// TODO: What should the initial set of URNs be?
private static final String DATA_INPUT_URN = "urn:org.apache.beam:source:runner:0.1";
- public static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
+ private static final String DATA_OUTPUT_URN = "urn:org.apache.beam:sink:runner:0.1";
+ private static final String JAVA_DO_FN_URN = "urn:org.apache.beam:dofn:java:0.1";
+ private static final String JAVA_SOURCE_URN = "urn:org.apache.beam:source:java:0.1";
private static final Logger LOG = LoggerFactory.getLogger(ProcessBundleHandler.class);
- private static final Map<String, PTransformRunnerFactory> REGISTERED_RUNNER_FACTORIES;
-
- static {
- Set<Registrar> pipelineRunnerRegistrars =
- Sets.newTreeSet(ReflectHelpers.ObjectsClassComparator.INSTANCE);
- pipelineRunnerRegistrars.addAll(
- Lists.newArrayList(ServiceLoader.load(Registrar.class,
- ReflectHelpers.findClassLoader())));
-
- // Load all registered PTransform runner factories.
- ImmutableMap.Builder<String, PTransformRunnerFactory> builder =
- ImmutableMap.builder();
- for (Registrar registrar : pipelineRunnerRegistrars) {
- builder.putAll(registrar.getPTransformRunnerFactories());
- }
- REGISTERED_RUNNER_FACTORIES = builder.build();
- }
private final PipelineOptions options;
private final Function<String, Message> fnApiRegistry;
private final BeamFnDataClient beamFnDataClient;
- private final Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap;
- private final PTransformRunnerFactory defaultPTransformRunnerFactory;
-
public ProcessBundleHandler(
PipelineOptions options,
Function<String, Message> fnApiRegistry,
BeamFnDataClient beamFnDataClient) {
- this(options, fnApiRegistry, beamFnDataClient, REGISTERED_RUNNER_FACTORIES);
- }
-
- @VisibleForTesting
- ProcessBundleHandler(
- PipelineOptions options,
- Function<String, Message> fnApiRegistry,
- BeamFnDataClient beamFnDataClient,
- Map<String, PTransformRunnerFactory> urnToPTransformRunnerFactoryMap) {
this.options = options;
this.fnApiRegistry = fnApiRegistry;
this.beamFnDataClient = beamFnDataClient;
- this.urnToPTransformRunnerFactoryMap = urnToPTransformRunnerFactoryMap;
- this.defaultPTransformRunnerFactory = new PTransformRunnerFactory<Object>() {
- @Override
- public Object createRunnerForPTransform(
- PipelineOptions pipelineOptions,
- BeamFnDataClient beamFnDataClient,
- String pTransformId,
- RunnerApi.PTransform pTransform,
- Supplier<String> processBundleInstructionId,
- Map<String, RunnerApi.PCollection> pCollections,
- Map<String, RunnerApi.Coder> coders,
- Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
- Consumer<ThrowingRunnable> addStartFunction,
- Consumer<ThrowingRunnable> addFinishFunction) {
- throw new IllegalStateException(String.format(
- "No factory registered for %s, known factories %s",
- pTransform.getSpec().getUrn(),
- urnToPTransformRunnerFactoryMap.keySet()));
- }
- };
}
private void createRunnerAndConsumersForPTransformRecursively(
@@ -157,19 +128,115 @@ public class ProcessBundleHandler {
}
}
- urnToPTransformRunnerFactoryMap.getOrDefault(
- pTransform.getSpec().getUrn(), defaultPTransformRunnerFactory)
- .createRunnerForPTransform(
- options,
- beamFnDataClient,
- pTransformId,
- pTransform,
- processBundleInstructionId,
- processBundleDescriptor.getPcollectionsMap(),
- processBundleDescriptor.getCodersMap(),
- pCollectionIdsToConsumers,
- addStartFunction,
- addFinishFunction);
+ createRunnerForPTransform(
+ pTransformId,
+ pTransform,
+ processBundleInstructionId,
+ processBundleDescriptor.getPcollectionsMap(),
+ pCollectionIdsToConsumers,
+ addStartFunction,
+ addFinishFunction);
+ }
+
+ protected void createRunnerForPTransform(
+ String pTransformId,
+ RunnerApi.PTransform pTransform,
+ Supplier<String> processBundleInstructionId,
+ Map<String, RunnerApi.PCollection> pCollections,
+ Multimap<String, ThrowingConsumer<WindowedValue<?>>> pCollectionIdsToConsumers,
+ Consumer<ThrowingRunnable> addStartFunction,
+ Consumer<ThrowingRunnable> addFinishFunction) throws IOException {
+
+
+ // For every output PCollection, create a map from output name to Consumer
+ ImmutableMap.Builder<String, Collection<ThrowingConsumer<WindowedValue<?>>>>
+ outputMapBuilder = ImmutableMap.builder();
+ for (Map.Entry<String, String> entry : pTransform.getOutputsMap().entrySet()) {
+ outputMapBuilder.put(
+ entry.getKey(),
+ pCollectionIdsToConsumers.get(entry.getValue()));
+ }
+ ImmutableMap<String, Collection<ThrowingConsumer<WindowedValue<?>>>> outputMap =
+ outputMapBuilder.build();
+
+
+ // Based upon the function spec, populate the start/finish/consumer information.
+ RunnerApi.FunctionSpec functionSpec = pTransform.getSpec();
+ ThrowingConsumer<WindowedValue<?>> consumer;
+ switch (functionSpec.getUrn()) {
+ default:
+ BeamFnApi.Target target;
+ RunnerApi.Coder coderSpec;
+ throw new IllegalArgumentException(
+ String.format("Unknown FunctionSpec %s", functionSpec));
+
+ case DATA_OUTPUT_URN:
+ target = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(pTransformId)
+ .setName(getOnlyElement(pTransform.getInputsMap().keySet()))
+ .build();
+ coderSpec = (RunnerApi.Coder) fnApiRegistry.apply(
+ pCollections.get(getOnlyElement(pTransform.getInputsMap().values())).getCoderId());
+ BeamFnDataWriteRunner<Object> remoteGrpcWriteRunner =
+ new BeamFnDataWriteRunner<Object>(
+ functionSpec,
+ processBundleInstructionId,
+ target,
+ coderSpec,
+ beamFnDataClient);
+ addStartFunction.accept(remoteGrpcWriteRunner::registerForOutput);
+ consumer = (ThrowingConsumer)
+ (ThrowingConsumer<WindowedValue<Object>>) remoteGrpcWriteRunner::consume;
+ addFinishFunction.accept(remoteGrpcWriteRunner::close);
+ break;
+
+ case DATA_INPUT_URN:
+ target = BeamFnApi.Target.newBuilder()
+ .setPrimitiveTransformReference(pTransformId)
+ .setName(getOnlyElement(pTransform.getOutputsMap().keySet()))
+ .build();
+ coderSpec = (RunnerApi.Coder) fnApiRegistry.apply(
+ pCollections.get(getOnlyElement(pTransform.getOutputsMap().values())).getCoderId());
+ BeamFnDataReadRunner<?> remoteGrpcReadRunner =
+ new BeamFnDataReadRunner<Object>(
+ functionSpec,
+ processBundleInstructionId,
+ target,
+ coderSpec,
+ beamFnDataClient,
+ (Map) outputMap);
+ addStartFunction.accept(remoteGrpcReadRunner::registerInputLocation);
+ consumer = null;
+ addFinishFunction.accept(remoteGrpcReadRunner::blockTillReadFinishes);
+ break;
+
+ case JAVA_DO_FN_URN:
+ DoFnRunner<Object, Object> doFnRunner = createDoFnRunner(functionSpec, (Map) outputMap);
+ addStartFunction.accept(doFnRunner::startBundle);
+ consumer = (ThrowingConsumer)
+ (ThrowingConsumer<WindowedValue<Object>>) doFnRunner::processElement;
+ addFinishFunction.accept(doFnRunner::finishBundle);
+ break;
+
+ case JAVA_SOURCE_URN:
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ BoundedSourceRunner<BoundedSource<Object>, Object> sourceRunner =
+ createBoundedSourceRunner(functionSpec, (Map) outputMap);
+ // TODO: Remove and replace with source being sent across gRPC port
+ addStartFunction.accept(sourceRunner::start);
+ consumer = (ThrowingConsumer)
+ (ThrowingConsumer<WindowedValue<BoundedSource<Object>>>)
+ sourceRunner::runReadLoop;
+ break;
+ }
+
+ // If we created a consumer, add it to the map containing PCollection ids to consumers
+ if (consumer != null) {
+ for (String inputPCollectionId :
+ pTransform.getInputsMap().values()) {
+ pCollectionIdsToConsumers.put(inputPCollectionId, consumer);
+ }
+ }
}
public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.InstructionRequest request)
@@ -232,4 +299,88 @@ public class ProcessBundleHandler {
return response;
}
+
+ /**
+ * Converts a {@link org.apache.beam.fn.v1.BeamFnApi.FunctionSpec} into a {@link DoFnRunner}.
+ */
+ private <InputT, OutputT> DoFnRunner<InputT, OutputT> createDoFnRunner(
+ RunnerApi.FunctionSpec functionSpec,
+ Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+ ByteString serializedFn;
+ try {
+ serializedFn = functionSpec.getParameter().unpack(BytesValue.class).getValue();
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(
+ String.format("Unable to unwrap DoFn %s", functionSpec), e);
+ }
+ DoFnInfo<?, ?> doFnInfo =
+ (DoFnInfo<?, ?>)
+ SerializableUtils.deserializeFromByteArray(serializedFn.toByteArray(), "DoFnInfo");
+
+ checkArgument(
+ Objects.equals(
+ new HashSet<>(Collections2.transform(outputMap.keySet(), Long::parseLong)),
+ doFnInfo.getOutputMap().keySet()),
+ "Unexpected mismatch between transform output map %s and DoFnInfo output map %s.",
+ outputMap.keySet(),
+ doFnInfo.getOutputMap());
+
+ ImmutableMultimap.Builder<TupleTag<?>,
+ ThrowingConsumer<WindowedValue<OutputT>>> tagToOutput =
+ ImmutableMultimap.builder();
+ for (Map.Entry<Long, TupleTag<?>> entry : doFnInfo.getOutputMap().entrySet()) {
+ tagToOutput.putAll(entry.getValue(), outputMap.get(Long.toString(entry.getKey())));
+ }
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tagBasedOutputMap =
+ (Map) tagToOutput.build().asMap();
+
+ OutputManager outputManager =
+ new OutputManager() {
+ Map<TupleTag<?>, Collection<ThrowingConsumer<WindowedValue<?>>>> tupleTagToOutput =
+ tagBasedOutputMap;
+
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ try {
+ Collection<ThrowingConsumer<WindowedValue<?>>> consumers =
+ tupleTagToOutput.get(tag);
+ if (consumers == null) {
+ /* This is a normal case, e.g., if a DoFn has output but that output is not
+ * consumed. Drop the output. */
+ return;
+ }
+ for (ThrowingConsumer<WindowedValue<?>> consumer : consumers) {
+ consumer.accept(output);
+ }
+ } catch (Throwable t) {
+ throw new RuntimeException(t);
+ }
+ }
+ };
+
+ @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
+ DoFnRunner<InputT, OutputT> runner =
+ DoFnRunners.simpleRunner(
+ PipelineOptionsFactory.create(), /* TODO */
+ (DoFn) doFnInfo.getDoFn(),
+ NullSideInputReader.empty(), /* TODO */
+ outputManager,
+ (TupleTag) doFnInfo.getOutputMap().get(doFnInfo.getMainOutput()),
+ new ArrayList<>(doFnInfo.getOutputMap().values()),
+ new FakeStepContext(),
+ (WindowingStrategy) doFnInfo.getWindowingStrategy());
+ return runner;
+ }
+
+ private <InputT extends BoundedSource<OutputT>, OutputT>
+ BoundedSourceRunner<InputT, OutputT> createBoundedSourceRunner(
+ RunnerApi.FunctionSpec functionSpec,
+ Map<String, Collection<ThrowingConsumer<WindowedValue<OutputT>>>> outputMap) {
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ BoundedSourceRunner<InputT, OutputT> runner =
+ new BoundedSourceRunner(options, functionSpec, outputMap);
+ return runner;
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
index 0e738ac..276a120 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/RegisterHandler.java
@@ -79,7 +79,7 @@ public class RegisterHandler {
processBundleDescriptor.getClass());
computeIfAbsent(processBundleDescriptor.getId()).complete(processBundleDescriptor);
for (Map.Entry<String, RunnerApi.Coder> entry
- : processBundleDescriptor.getCodersMap().entrySet()) {
+ : processBundleDescriptor.getCodersyyyMap().entrySet()) {
LOG.debug("Registering {} with type {}",
entry.getKey(),
entry.getValue().getClass());