You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:07 UTC
[09/17] incubator-beam git commit: Move InProcessRunner to its own
module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
new file mode 100644
index 0000000..236ad17
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+
+/**
+ * Tests for {@link ParDoSingleEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ParDoSingleEvaluatorFactoryTest implements Serializable {
+ private transient BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+ @Test
+ public void testParDoInMemoryTransformEvaluator() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+ PCollection<Integer> collection =
+ input.apply(
+ ParDo.of(
+ new DoFn<String, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.element().length());
+ }
+ }));
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
+ when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, null, null, null);
+ when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
+ new ParDoSingleEvaluatorFactory()
+ .forApplication(
+ collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+ evaluator.processElement(
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+ evaluator.processElement(
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(result.getOutputBundles(), Matchers.<UncommittedBundle<?>>contains(outputBundle));
+ assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ assertThat(result.getCounters(), equalTo(counters));
+
+ assertThat(
+ outputBundle.commit(Instant.now()).getElements(),
+ Matchers.<WindowedValue<Integer>>containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow(3),
+ WindowedValue.timestampedValueInGlobalWindow(4, new Instant(1000)),
+ WindowedValue.valueInGlobalWindow(5, PaneInfo.ON_TIME_AND_ONLY_FIRING)));
+ }
+
+ @Test
+ public void testSideOutputToUndeclaredSideOutputSucceeds() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+ final TupleTag<Integer> sideOutputTag = new TupleTag<Integer>() {};
+ PCollection<Integer> collection =
+ input.apply(
+ ParDo.of(
+ new DoFn<String, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.sideOutput(sideOutputTag, c.element().length());
+ }
+ }));
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<Integer> outputBundle = bundleFactory.createRootBundle(collection);
+ when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, null, null, null);
+ when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ TransformEvaluator<String> evaluator =
+ new ParDoSingleEvaluatorFactory()
+ .forApplication(
+ collection.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+ evaluator.processElement(
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+ evaluator.processElement(
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getOutputBundles(), Matchers.<UncommittedBundle<?>>containsInAnyOrder(outputBundle));
+ assertThat(result.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MAX_VALUE));
+ assertThat(result.getCounters(), equalTo(counters));
+ }
+
+ @Test
+ public void finishBundleWithStatePutsStateInResult() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+ final StateTag<Object, WatermarkHoldState<BoundedWindow>> watermarkTag =
+ StateTags.watermarkStateInternal("myId", OutputTimeFns.outputAtEarliestInputTimestamp());
+ final StateTag<Object, BagState<String>> bagTag = StateTags.bag("myBag", StringUtf8Coder.of());
+ final StateNamespace windowNs =
+ StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE);
+ ParDo.Bound<String, KV<String, Integer>> pardo =
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.windowingInternals()
+ .stateInternals()
+ .state(StateNamespaces.global(), watermarkTag)
+ .add(new Instant(124443L - c.element().length()));
+ c.windowingInternals()
+ .stateInternals()
+ .state(
+ StateNamespaces.window(GlobalWindow.Coder.INSTANCE, GlobalWindow.INSTANCE),
+ bagTag)
+ .add(c.element());
+ }
+ });
+ PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+
+ when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, "myKey", null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ org.apache.beam.runners.direct.TransformEvaluator<String> evaluator =
+ new ParDoSingleEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+ evaluator.processElement(
+ WindowedValue.timestampedValueInGlobalWindow("bara", new Instant(1000)));
+ evaluator.processElement(
+ WindowedValue.valueInGlobalWindow("bazam", PaneInfo.ON_TIME_AND_ONLY_FIRING));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(result.getWatermarkHold(), equalTo(new Instant(124438L)));
+ assertThat(result.getState(), not(nullValue()));
+ assertThat(
+ result.getState().state(StateNamespaces.global(), watermarkTag).read(),
+ equalTo(new Instant(124438L)));
+ assertThat(
+ result.getState().state(windowNs, bagTag).read(),
+ containsInAnyOrder("foo", "bara", "bazam"));
+ }
+
+ @Test
+ public void finishBundleWithStateAndTimersPutsTimersInResult() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bara", "bazam"));
+
+ final TimerData addedTimer =
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0).plus(Duration.standardMinutes(5)),
+ new Instant(1)
+ .plus(Duration.standardMinutes(5))
+ .plus(Duration.standardHours(1)))),
+ new Instant(54541L),
+ TimeDomain.EVENT_TIME);
+ final TimerData deletedTimer =
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardHours(1)))),
+ new Instant(3400000),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+
+ ParDo.Bound<String, KV<String, Integer>> pardo =
+ ParDo.of(
+ new DoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.windowingInternals().stateInternals();
+ c.windowingInternals()
+ .timerInternals()
+ .setTimer(
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0).plus(Duration.standardMinutes(5)),
+ new Instant(1)
+ .plus(Duration.standardMinutes(5))
+ .plus(Duration.standardHours(1)))),
+ new Instant(54541L),
+ TimeDomain.EVENT_TIME));
+ c.windowingInternals()
+ .timerInternals()
+ .deleteTimer(
+ TimerData.of(
+ StateNamespaces.window(
+ IntervalWindow.getCoder(),
+ new IntervalWindow(
+ new Instant(0),
+ new Instant(0).plus(Duration.standardHours(1)))),
+ new Instant(3400000),
+ TimeDomain.SYNCHRONIZED_PROCESSING_TIME));
+ }
+ });
+ PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+ UncommittedBundle<KV<String, Integer>> mainOutputBundle =
+ bundleFactory.createRootBundle(mainOutput);
+
+ when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
+
+ InProcessExecutionContext executionContext =
+ new InProcessExecutionContext(null, "myKey", null, null);
+ when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+ .thenReturn(executionContext);
+ CounterSet counters = new CounterSet();
+ when(evaluationContext.createCounterSet()).thenReturn(counters);
+
+ TransformEvaluator<String> evaluator =
+ new ParDoSingleEvaluatorFactory()
+ .forApplication(
+ mainOutput.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getTimerUpdate(),
+ equalTo(
+ TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
new file mode 100644
index 0000000..49c9061
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TextIOShardedWriteFactoryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.TextIOTest;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.File;
+
+/**
+ * Tests for {@link TextIOShardedWriteFactory}.
+ */
+@RunWith(JUnit4.class)
+public class TextIOShardedWriteFactoryTest {
+ @Rule public TemporaryFolder tmp = new TemporaryFolder();
+ private TextIOShardedWriteFactory factory;
+
+ @Before
+ public void setup() {
+ factory = new TextIOShardedWriteFactory();
+ }
+
+ @Test
+ public void originalWithoutShardingReturnsOriginal() throws Exception {
+ File file = tmp.newFile("foo");
+ PTransform<PCollection<String>, PDone> original =
+ TextIO.Write.to(file.getAbsolutePath()).withoutSharding();
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, theInstance(original));
+ }
+
+ @Test
+ public void originalShardingNotSpecifiedReturnsOriginal() throws Exception {
+ File file = tmp.newFile("foo");
+ PTransform<PCollection<String>, PDone> original = TextIO.Write.to(file.getAbsolutePath());
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, theInstance(original));
+ }
+
+ @Test
+ public void originalShardedToOneReturnsExplicitlySharded() throws Exception {
+ File file = tmp.newFile("foo");
+ TextIO.Write.Bound<String> original =
+ TextIO.Write.to(file.getAbsolutePath()).withNumShards(1);
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+ TestPipeline p = TestPipeline.create();
+ String[] elems = new String[] {"foo", "bar", "baz"};
+ p.apply(Create.<String>of(elems)).apply(overridden);
+
+ file.delete();
+
+ p.run();
+ TextIOTest.assertOutputFiles(
+ elems, StringUtf8Coder.of(), 1, tmp, "foo", original.getShardNameTemplate());
+ }
+
+ @Test
+ public void originalShardedToManyReturnsExplicitlySharded() throws Exception {
+ File file = tmp.newFile("foo");
+ TextIO.Write.Bound<String> original = TextIO.Write.to(file.getAbsolutePath()).withNumShards(3);
+ PTransform<PCollection<String>, PDone> overridden = factory.override(original);
+
+ assertThat(overridden, not(Matchers.<PTransform<PCollection<String>, PDone>>equalTo(original)));
+
+ TestPipeline p = TestPipeline.create();
+ String[] elems = new String[] {"foo", "bar", "baz", "spam", "ham", "eggs"};
+ p.apply(Create.<String>of(elems)).apply(overridden);
+
+ file.delete();
+ p.run();
+ TextIOTest.assertOutputFiles(
+ elems, StringUtf8Coder.of(), 3, tmp, "foo", original.getShardNameTemplate());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
new file mode 100644
index 0000000..7dd5830
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorServicesTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.any;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.hamcrest.Matchers;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Tests for {@link TransformExecutorServices}.
+ */
+@RunWith(JUnit4.class)
+public class TransformExecutorServicesTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private ExecutorService executorService;
+ private Map<TransformExecutor<?>, Boolean> scheduled;
+
+ @Before
+ public void setup() {
+ executorService = MoreExecutors.newDirectExecutorService();
+ scheduled = new ConcurrentHashMap<>();
+ }
+
+ @Test
+ public void parallelScheduleMultipleSchedulesBothImmediately() {
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> first = mock(TransformExecutor.class);
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+ TransformExecutorService parallel =
+ TransformExecutorServices.parallel(executorService, scheduled);
+ parallel.schedule(first);
+ parallel.schedule(second);
+
+ verify(first).call();
+ verify(second).call();
+ assertThat(
+ scheduled,
+ Matchers.allOf(
+ Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true),
+ Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true)));
+
+ parallel.complete(first);
+ assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
+ assertThat(
+ scheduled,
+ not(
+ Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+ Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
+ parallel.complete(second);
+ assertThat(scheduled.isEmpty(), is(true));
+ }
+
+ @Test
+ public void serialScheduleTwoWaitsForFirstToComplete() {
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> first = mock(TransformExecutor.class);
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+ TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+ serial.schedule(first);
+ verify(first).call();
+
+ serial.schedule(second);
+ verify(second, never()).call();
+
+ assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(first, true));
+ assertThat(
+ scheduled,
+ not(
+ Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+ Matchers.<TransformExecutor<?>>equalTo(second), any(Boolean.class))));
+
+ serial.complete(first);
+ verify(second).call();
+ assertThat(scheduled, Matchers.<TransformExecutor<?>, Boolean>hasEntry(second, true));
+ assertThat(
+ scheduled,
+ not(
+ Matchers.<TransformExecutor<?>, Boolean>hasEntry(
+ Matchers.<TransformExecutor<?>>equalTo(first), any(Boolean.class))));
+
+ serial.complete(second);
+ }
+
+ @Test
+ public void serialCompleteNotExecutingTaskThrows() {
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> first = mock(TransformExecutor.class);
+ @SuppressWarnings("unchecked")
+ TransformExecutor<Object> second = mock(TransformExecutor.class);
+
+ TransformExecutorService serial = TransformExecutorServices.serial(executorService, scheduled);
+ serial.schedule(first);
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("unexpected currently executing");
+
+ serial.complete(second);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
new file mode 100644
index 0000000..959e9d3
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.IllegalMutationException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Tests for {@link TransformExecutor}.
+ */
+@RunWith(JUnit4.class)
+public class TransformExecutorTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+ private PCollection<String> created;
+ private PCollection<KV<Integer, String>> downstream;
+
+ private CountDownLatch evaluatorCompleted;
+
+ private RegisteringCompletionCallback completionCallback;
+ private TransformExecutorService transformEvaluationState;
+ private BundleFactory bundleFactory;
+ @Mock private InProcessEvaluationContext evaluationContext;
+ @Mock private TransformEvaluatorRegistry registry;
+ private Map<TransformExecutor<?>, Boolean> scheduled;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ bundleFactory = InProcessBundleFactory.create();
+
+ scheduled = new HashMap<>();
+ transformEvaluationState =
+ TransformExecutorServices.parallel(MoreExecutors.newDirectExecutorService(), scheduled);
+
+ evaluatorCompleted = new CountDownLatch(1);
+ completionCallback = new RegisteringCompletionCallback(evaluatorCompleted);
+
+ TestPipeline p = TestPipeline.create();
+ created = p.apply(Create.of("foo", "spam", "third"));
+ downstream = created.apply(WithKeys.<Integer, String>of(3));
+ }
+
+ @Test
+ public void callWithNullInputBundleFinishesBundleAndCompletes() throws Exception {
+ final InProcessTransformResult result =
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build();
+ final AtomicBoolean finishCalled = new AtomicBoolean(false);
+ TransformEvaluator<Object> evaluator =
+ new TransformEvaluator<Object>() {
+ @Override
+ public void processElement(WindowedValue<Object> element) throws Exception {
+ throw new IllegalArgumentException("Shouldn't be called");
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ finishCalled.set(true);
+ return result;
+ }
+ };
+
+ when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext))
+ .thenReturn(evaluator);
+
+ TransformExecutor<Object> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>emptyList(),
+ evaluationContext,
+ null,
+ created.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+ executor.call();
+
+ assertThat(finishCalled.get(), is(true));
+ assertThat(completionCallback.handledResult, equalTo(result));
+ assertThat(completionCallback.handledThrowable, is(nullValue()));
+ assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+ }
+
+ @Test
+ public void inputBundleProcessesEachElementFinishesAndCompletes() throws Exception {
+ final InProcessTransformResult result =
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+ final Collection<WindowedValue<String>> elementsProcessed = new ArrayList<>();
+ TransformEvaluator<String> evaluator =
+ new TransformEvaluator<String>() {
+ @Override
+ public void processElement(WindowedValue<String> element) throws Exception {
+ elementsProcessed.add(element);
+ return;
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ return result;
+ }
+ };
+
+ WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
+ WindowedValue<String> spam = WindowedValue.valueInGlobalWindow("spam");
+ WindowedValue<String> third = WindowedValue.valueInGlobalWindow("third");
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(created).add(foo).add(spam).add(third).commit(Instant.now());
+ when(
+ registry.<String>forApplication(
+ downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+ .thenReturn(evaluator);
+
+ TransformExecutor<String> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>emptyList(),
+ evaluationContext,
+ inputBundle,
+ downstream.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+
+ Executors.newSingleThreadExecutor().submit(executor);
+
+ evaluatorCompleted.await();
+
+ assertThat(elementsProcessed, containsInAnyOrder(spam, third, foo));
+ assertThat(completionCallback.handledResult, equalTo(result));
+ assertThat(completionCallback.handledThrowable, is(nullValue()));
+ assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+ }
+
+ @Test
+ public void processElementThrowsExceptionCallsback() throws Exception {
+ final InProcessTransformResult result =
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+ final Exception exception = new Exception();
+ TransformEvaluator<String> evaluator =
+ new TransformEvaluator<String>() {
+ @Override
+ public void processElement(WindowedValue<String> element) throws Exception {
+ throw exception;
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ return result;
+ }
+ };
+
+ WindowedValue<String> foo = WindowedValue.valueInGlobalWindow("foo");
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(created).add(foo).commit(Instant.now());
+ when(
+ registry.<String>forApplication(
+ downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+ .thenReturn(evaluator);
+
+ TransformExecutor<String> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>emptyList(),
+ evaluationContext,
+ inputBundle,
+ downstream.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+ Executors.newSingleThreadExecutor().submit(executor);
+
+ evaluatorCompleted.await();
+
+ assertThat(completionCallback.handledResult, is(nullValue()));
+ assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+ assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+ }
+
+ @Test
+ public void finishBundleThrowsExceptionCallsback() throws Exception {
+ final Exception exception = new Exception();
+ TransformEvaluator<String> evaluator =
+ new TransformEvaluator<String>() {
+ @Override
+ public void processElement(WindowedValue<String> element) throws Exception {}
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ throw exception;
+ }
+ };
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(created).commit(Instant.now());
+ when(
+ registry.<String>forApplication(
+ downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+ .thenReturn(evaluator);
+
+ TransformExecutor<String> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>emptyList(),
+ evaluationContext,
+ inputBundle,
+ downstream.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+ Executors.newSingleThreadExecutor().submit(executor);
+
+ evaluatorCompleted.await();
+
+ assertThat(completionCallback.handledResult, is(nullValue()));
+ assertThat(completionCallback.handledThrowable, Matchers.<Throwable>equalTo(exception));
+ assertThat(scheduled, not(Matchers.<TransformExecutor<?>>hasKey(executor)));
+ }
+
+ @Test
+ public void duringCallGetThreadIsNonNull() throws Exception {
+ final InProcessTransformResult result =
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+ final CountDownLatch testLatch = new CountDownLatch(1);
+ final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+ TransformEvaluator<Object> evaluator =
+ new TransformEvaluator<Object>() {
+ @Override
+ public void processElement(WindowedValue<Object> element) throws Exception {
+ throw new IllegalArgumentException("Shouldn't be called");
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ testLatch.countDown();
+ evaluatorLatch.await();
+ return result;
+ }
+ };
+
+ when(registry.forApplication(created.getProducingTransformInternal(), null, evaluationContext))
+ .thenReturn(evaluator);
+
+ TransformExecutor<String> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>emptyList(),
+ evaluationContext,
+ null,
+ created.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+
+ Executors.newSingleThreadExecutor().submit(executor);
+ testLatch.await();
+ assertThat(executor.getThread(), not(nullValue()));
+
+ // Finish the execution so everything can get closed down cleanly.
+ evaluatorLatch.countDown();
+ }
+
+ @Test
+ public void callWithEnforcementAppliesEnforcement() throws Exception {
+ final InProcessTransformResult result =
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
+
+ TransformEvaluator<Object> evaluator =
+ new TransformEvaluator<Object>() {
+ @Override
+ public void processElement(WindowedValue<Object> element) throws Exception {
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ return result;
+ }
+ };
+
+ WindowedValue<String> fooElem = WindowedValue.valueInGlobalWindow("foo");
+ WindowedValue<String> barElem = WindowedValue.valueInGlobalWindow("bar");
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(created).add(fooElem).add(barElem).commit(Instant.now());
+ when(
+ registry.forApplication(
+ downstream.getProducingTransformInternal(), inputBundle, evaluationContext))
+ .thenReturn(evaluator);
+
+ TestEnforcementFactory enforcement = new TestEnforcementFactory();
+ TransformExecutor<String> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>singleton(enforcement),
+ evaluationContext,
+ inputBundle,
+ downstream.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+
+ executor.call();
+ TestEnforcement<?> testEnforcement = enforcement.instance;
+ assertThat(
+ testEnforcement.beforeElements,
+ Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
+ assertThat(
+ testEnforcement.afterElements,
+ Matchers.<WindowedValue<?>>containsInAnyOrder(barElem, fooElem));
+ assertThat(testEnforcement.finishedBundles, contains(result));
+ }
+
+ @Test
+ public void callWithEnforcementThrowsOnFinishPropagates() throws Exception {
+ PCollection<byte[]> pcBytes =
+ created.apply(
+ new PTransform<PCollection<String>, PCollection<byte[]>>() {
+ @Override
+ public PCollection<byte[]> apply(PCollection<String> input) {
+ return PCollection.<byte[]>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+ .setCoder(ByteArrayCoder.of());
+ }
+ });
+
+ final InProcessTransformResult result =
+ StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
+ final CountDownLatch testLatch = new CountDownLatch(1);
+ final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+
+ TransformEvaluator<Object> evaluator =
+ new TransformEvaluator<Object>() {
+ @Override
+ public void processElement(WindowedValue<Object> element) throws Exception {}
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ testLatch.countDown();
+ evaluatorLatch.await();
+ return result;
+ }
+ };
+
+ WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
+ CommittedBundle<byte[]> inputBundle =
+ bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
+ when(
+ registry.forApplication(
+ pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext))
+ .thenReturn(evaluator);
+
+ TransformExecutor<byte[]> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+ evaluationContext,
+ inputBundle,
+ pcBytes.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+
+ Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor);
+ testLatch.await();
+ fooBytes.getValue()[0] = 'b';
+ evaluatorLatch.countDown();
+
+ thrown.expectCause(isA(IllegalMutationException.class));
+ task.get();
+ }
+
+ @Test
+ public void callWithEnforcementThrowsOnElementPropagates() throws Exception {
+ PCollection<byte[]> pcBytes =
+ created.apply(
+ new PTransform<PCollection<String>, PCollection<byte[]>>() {
+ @Override
+ public PCollection<byte[]> apply(PCollection<String> input) {
+ return PCollection.<byte[]>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
+ .setCoder(ByteArrayCoder.of());
+ }
+ });
+
+ final InProcessTransformResult result =
+ StepTransformResult.withoutHold(pcBytes.getProducingTransformInternal()).build();
+ final CountDownLatch testLatch = new CountDownLatch(1);
+ final CountDownLatch evaluatorLatch = new CountDownLatch(1);
+
+ TransformEvaluator<Object> evaluator =
+ new TransformEvaluator<Object>() {
+ @Override
+ public void processElement(WindowedValue<Object> element) throws Exception {
+ testLatch.countDown();
+ evaluatorLatch.await();
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ return result;
+ }
+ };
+
+ WindowedValue<byte[]> fooBytes = WindowedValue.valueInGlobalWindow("foo".getBytes());
+ CommittedBundle<byte[]> inputBundle =
+ bundleFactory.createRootBundle(pcBytes).add(fooBytes).commit(Instant.now());
+ when(
+ registry.forApplication(
+ pcBytes.getProducingTransformInternal(), inputBundle, evaluationContext))
+ .thenReturn(evaluator);
+
+ TransformExecutor<byte[]> executor =
+ TransformExecutor.create(
+ registry,
+ Collections.<ModelEnforcementFactory>singleton(ImmutabilityEnforcementFactory.create()),
+ evaluationContext,
+ inputBundle,
+ pcBytes.getProducingTransformInternal(),
+ completionCallback,
+ transformEvaluationState);
+
+ Future<InProcessTransformResult> task = Executors.newSingleThreadExecutor().submit(executor);
+ testLatch.await();
+ fooBytes.getValue()[0] = 'b';
+ evaluatorLatch.countDown();
+
+ thrown.expectCause(isA(IllegalMutationException.class));
+ task.get();
+ }
+
+ private static class RegisteringCompletionCallback implements CompletionCallback {
+ private InProcessTransformResult handledResult = null;
+ private Throwable handledThrowable = null;
+ private final CountDownLatch onMethod;
+
+ private RegisteringCompletionCallback(CountDownLatch onMethod) {
+ this.onMethod = onMethod;
+ }
+
+ @Override
+ public CommittedResult handleResult(
+ CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+ handledResult = result;
+ onMethod.countDown();
+ return CommittedResult.create(result, Collections.<CommittedBundle<?>>emptyList());
+ }
+
+ @Override
+ public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
+ handledThrowable = t;
+ onMethod.countDown();
+ }
+ }
+
+ private static class TestEnforcementFactory implements ModelEnforcementFactory {
+ private TestEnforcement<?> instance;
+ @Override
+ public <T> TestEnforcement<T> forBundle(
+ CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer) {
+ TestEnforcement<T> newEnforcement = new TestEnforcement<>();
+ instance = newEnforcement;
+ return newEnforcement;
+ }
+ }
+
+ private static class TestEnforcement<T> implements ModelEnforcement<T> {
+ private final List<WindowedValue<T>> beforeElements = new ArrayList<>();
+ private final List<WindowedValue<T>> afterElements = new ArrayList<>();
+ private final List<InProcessTransformResult> finishedBundles = new ArrayList<>();
+
+ @Override
+ public void beforeElement(WindowedValue<T> element) {
+ beforeElements.add(element);
+ }
+
+ @Override
+ public void afterElement(WindowedValue<T> element) {
+ afterElements.add(element);
+ }
+
+ @Override
+ public void afterFinish(
+ CommittedBundle<T> input,
+ InProcessTransformResult result,
+ Iterable<? extends CommittedBundle<?>> outputs) {
+ finishedBundles.add(result);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
new file mode 100644
index 0000000..9f3909e
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactoryTest.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.UnboundedSource;
+import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableList;
+
+import org.hamcrest.Matchers;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.annotation.Nullable;
+/**
+ * Tests for {@link UnboundedReadEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class UnboundedReadEvaluatorFactoryTest {
+ private PCollection<Long> longs;
+ private TransformEvaluatorFactory factory;
+ private InProcessEvaluationContext context;
+ private UncommittedBundle<Long> output;
+
+ private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+ @Before
+ public void setup() {
+ UnboundedSource<Long, ?> source =
+ CountingSource.unboundedWithTimestampFn(new LongToInstantFn());
+ TestPipeline p = TestPipeline.create();
+ longs = p.apply(Read.from(source));
+
+ factory = new UnboundedReadEvaluatorFactory();
+ context = mock(InProcessEvaluationContext.class);
+ output = bundleFactory.createRootBundle(longs);
+ when(context.createRootBundle(longs)).thenReturn(output);
+ }
+
+ @Test
+ public void unboundedSourceInMemoryTransformEvaluatorProducesElements() throws Exception {
+ TransformEvaluator<?> evaluator =
+ factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+ assertThat(
+ output.commit(Instant.now()).getElements(),
+ containsInAnyOrder(
+ tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+ tgw(0L)));
+ }
+
+ /**
+ * Demonstrate that multiple sequential creations will produce additional elements if the source
+ * can provide them.
+ */
+ @Test
+ public void unboundedSourceInMemoryTransformEvaluatorMultipleSequentialCalls() throws Exception {
+ TransformEvaluator<?> evaluator =
+ factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+ InProcessTransformResult result = evaluator.finishBundle();
+ assertThat(
+ result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+ assertThat(
+ output.commit(Instant.now()).getElements(),
+ containsInAnyOrder(
+ tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+ tgw(0L)));
+
+ UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
+ when(context.createRootBundle(longs)).thenReturn(secondOutput);
+ TransformEvaluator<?> secondEvaluator =
+ factory.forApplication(longs.getProducingTransformInternal(), null, context);
+ InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+ assertThat(
+ secondResult.getWatermarkHold(),
+ Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+ assertThat(
+ secondOutput.commit(Instant.now()).getElements(),
+ containsInAnyOrder(tgw(11L), tgw(12L), tgw(14L), tgw(18L), tgw(19L), tgw(17L), tgw(16L),
+ tgw(15L), tgw(13L), tgw(10L)));
+ }
+
+ @Test
+ public void boundedSourceEvaluatorClosesReader() throws Exception {
+ TestUnboundedSource<Long> source =
+ new TestUnboundedSource<>(BigEndianLongCoder.of(), 1L, 2L, 3L);
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
+ when(context.createRootBundle(pcollection)).thenReturn(output);
+
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ CommittedBundle<Long> committed = output.commit(Instant.now());
+ assertThat(ImmutableList.copyOf(committed.getElements()), hasSize(3));
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ }
+
+ @Test
+ public void boundedSourceEvaluatorNoElementsClosesReader() throws Exception {
+ TestUnboundedSource<Long> source = new TestUnboundedSource<>(BigEndianLongCoder.of());
+
+ TestPipeline p = TestPipeline.create();
+ PCollection<Long> pcollection = p.apply(Read.from(source));
+ AppliedPTransform<?, ?, ?> sourceTransform = pcollection.getProducingTransformInternal();
+
+ UncommittedBundle<Long> output = bundleFactory.createRootBundle(pcollection);
+ when(context.createRootBundle(pcollection)).thenReturn(output);
+
+ TransformEvaluator<?> evaluator = factory.forApplication(sourceTransform, null, context);
+ evaluator.finishBundle();
+ CommittedBundle<Long> committed = output.commit(Instant.now());
+ assertThat(committed.getElements(), emptyIterable());
+ assertThat(TestUnboundedSource.readerClosedCount, equalTo(1));
+ }
+
+ // TODO: Once the source is split into multiple sources before evaluating, this test will have to
+ // be updated.
+ /**
+ * Demonstrate that only a single unfinished instance of TransformEvaluator can be created at a
+ * time, with other calls returning an empty evaluator.
+ */
+ @Test
+ public void unboundedSourceWithMultipleSimultaneousEvaluatorsIndependent() throws Exception {
+ UncommittedBundle<Long> secondOutput = bundleFactory.createRootBundle(longs);
+
+ TransformEvaluator<?> evaluator =
+ factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+ TransformEvaluator<?> secondEvaluator =
+ factory.forApplication(longs.getProducingTransformInternal(), null, context);
+
+ InProcessTransformResult secondResult = secondEvaluator.finishBundle();
+ InProcessTransformResult result = evaluator.finishBundle();
+
+ assertThat(
+ result.getWatermarkHold(), Matchers.<ReadableInstant>lessThan(DateTime.now().toInstant()));
+ assertThat(
+ output.commit(Instant.now()).getElements(),
+ containsInAnyOrder(
+ tgw(1L), tgw(2L), tgw(4L), tgw(8L), tgw(9L), tgw(7L), tgw(6L), tgw(5L), tgw(3L),
+ tgw(0L)));
+
+ assertThat(secondResult.getWatermarkHold(), equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE));
+ assertThat(secondOutput.commit(Instant.now()).getElements(), emptyIterable());
+ }
+
+ /**
+ * A terse alias for producing timestamped longs in the {@link GlobalWindow}, where
+ * the timestamp is the epoch offset by the value of the element.
+ */
+ private static WindowedValue<Long> tgw(Long elem) {
+ return WindowedValue.timestampedValueInGlobalWindow(elem, new Instant(elem));
+ }
+
+ private static class LongToInstantFn implements SerializableFunction<Long, Instant> {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input);
+ }
+ }
+
+ private static class TestUnboundedSource<T> extends UnboundedSource<T, TestCheckpointMark> {
+ static int readerClosedCount;
+ private final Coder<T> coder;
+ private final List<T> elems;
+
+ public TestUnboundedSource(Coder<T> coder, T... elems) {
+ readerClosedCount = 0;
+ this.coder = coder;
+ this.elems = Arrays.asList(elems);
+ }
+
+ @Override
+ public List<? extends UnboundedSource<T, TestCheckpointMark>> generateInitialSplits(
+ int desiredNumSplits, PipelineOptions options) throws Exception {
+ return ImmutableList.of(this);
+ }
+
+ @Override
+ public UnboundedSource.UnboundedReader<T> createReader(
+ PipelineOptions options, TestCheckpointMark checkpointMark) {
+ return new TestUnboundedReader(elems);
+ }
+
+ @Override
+ @Nullable
+ public Coder<TestCheckpointMark> getCheckpointMarkCoder() {
+ return new TestCheckpointMark.Coder();
+ }
+
+ @Override
+ public void validate() {}
+
+ @Override
+ public Coder<T> getDefaultOutputCoder() {
+ return coder;
+ }
+
+ private class TestUnboundedReader extends UnboundedReader<T> {
+ private final List<T> elems;
+ private int index;
+
+ public TestUnboundedReader(List<T> elems) {
+ this.elems = elems;
+ this.index = -1;
+ }
+
+ @Override
+ public boolean start() throws IOException {
+ return advance();
+ }
+
+ @Override
+ public boolean advance() throws IOException {
+ if (index + 1 < elems.size()) {
+ index++;
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Instant getWatermark() {
+ return Instant.now();
+ }
+
+ @Override
+ public CheckpointMark getCheckpointMark() {
+ return new TestCheckpointMark();
+ }
+
+ @Override
+ public UnboundedSource<T, ?> getCurrentSource() {
+ TestUnboundedSource<T> source = TestUnboundedSource.this;
+ return source;
+ }
+
+ @Override
+ public T getCurrent() throws NoSuchElementException {
+ return elems.get(index);
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public void close() throws IOException {
+ readerClosedCount++;
+ }
+ }
+ }
+
+ private static class TestCheckpointMark implements CheckpointMark {
+ @Override
+ public void finalizeCheckpoint() throws IOException {}
+
+ public static class Coder extends AtomicCoder<TestCheckpointMark> {
+ @Override
+ public void encode(
+ TestCheckpointMark value,
+ OutputStream outStream,
+ org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {}
+
+ @Override
+ public TestCheckpointMark decode(
+ InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context)
+ throws CoderException, IOException {
+ return new TestCheckpointMark();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
new file mode 100644
index 0000000..859418b
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.util.PCollectionViews;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ViewEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class ViewEvaluatorFactoryTest {
+ private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+ @Test
+ public void testInMemoryEvaluator() throws Exception {
+ TestPipeline p = TestPipeline.create();
+
+ PCollection<String> input = p.apply(Create.of("foo", "bar"));
+ CreatePCollectionView<String, Iterable<String>> createView =
+ CreatePCollectionView.of(
+ PCollectionViews.iterableView(p, input.getWindowingStrategy(), StringUtf8Coder.of()));
+ PCollection<Iterable<String>> concat =
+ input.apply(WithKeys.<Void, String>of((Void) null))
+ .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of()))
+ .apply(GroupByKey.<Void, String>create())
+ .apply(Values.<Iterable<String>>create());
+ PCollectionView<Iterable<String>> view =
+ concat.apply(new ViewEvaluatorFactory.WriteView<>(createView));
+
+ InProcessEvaluationContext context = mock(InProcessEvaluationContext.class);
+ TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>();
+ when(context.createPCollectionViewWriter(concat, view)).thenReturn(viewWriter);
+
+ CommittedBundle<String> inputBundle =
+ bundleFactory.createRootBundle(input).commit(Instant.now());
+ TransformEvaluator<Iterable<String>> evaluator =
+ new ViewEvaluatorFactory()
+ .forApplication(view.getProducingTransformInternal(), inputBundle, context);
+
+ evaluator.processElement(
+ WindowedValue.<Iterable<String>>valueInGlobalWindow(ImmutableList.of("foo", "bar")));
+ assertThat(viewWriter.latest, nullValue());
+
+ evaluator.finishBundle();
+ assertThat(
+ viewWriter.latest,
+ containsInAnyOrder(
+ WindowedValue.valueInGlobalWindow("foo"), WindowedValue.valueInGlobalWindow("bar")));
+ }
+
+ private static class TestViewWriter<ElemT, ViewT> implements PCollectionViewWriter<ElemT, ViewT> {
+ private Iterable<WindowedValue<ElemT>> latest;
+
+ @Override
+ public void add(Iterable<WindowedValue<ElemT>> values) {
+ latest = values;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
new file mode 100644
index 0000000..d47cf5e
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkCallbackExecutorTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Tests for {@link WatermarkCallbackExecutor}.
+ */
+@RunWith(JUnit4.class)
+public class WatermarkCallbackExecutorTest {
+ private WatermarkCallbackExecutor executor = WatermarkCallbackExecutor.create();
+ private AppliedPTransform<?, ?, ?> create;
+ private AppliedPTransform<?, ?, ?> sum;
+
+ @Before
+ public void setup() {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Integer> created = p.apply(Create.of(1, 2, 3));
+ create = created.getProducingTransformInternal();
+ sum = created.apply(Sum.integersGlobally()).getProducingTransformInternal();
+ }
+
+ @Test
+ public void onGuaranteedFiringFiresAfterTrigger() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ executor.callOnGuaranteedFiring(
+ create,
+ GlobalWindow.INSTANCE,
+ WindowingStrategy.globalDefault(),
+ new CountDownLatchCallback(latch));
+
+ executor.fireForWatermark(create, BoundedWindow.TIMESTAMP_MAX_VALUE);
+ assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
+ }
+
+ @Test
+ public void multipleCallbacksShouldFireFires() throws Exception {
+ CountDownLatch latch = new CountDownLatch(2);
+ WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+ executor.callOnGuaranteedFiring(
+ create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+ executor.callOnGuaranteedFiring(
+ create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+ executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(10)));
+ assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(true));
+ }
+
+ @Test
+ public void noCallbacksShouldFire() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+ executor.callOnGuaranteedFiring(
+ create, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+ executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(5)));
+ assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
+ }
+
+ @Test
+ public void unrelatedStepShouldNotFire() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(Duration.standardMinutes(10));
+ IntervalWindow window =
+ new IntervalWindow(new Instant(0L), new Instant(0L).plus(Duration.standardMinutes(10)));
+ executor.callOnGuaranteedFiring(
+ sum, window, WindowingStrategy.of(windowFn), new CountDownLatchCallback(latch));
+
+ executor.fireForWatermark(create, new Instant(0L).plus(Duration.standardMinutes(20)));
+ assertThat(latch.await(500, TimeUnit.MILLISECONDS), equalTo(false));
+ }
+
+ private static class CountDownLatchCallback implements Runnable {
+ private final CountDownLatch latch;
+
+ public CountDownLatchCallback(CountDownLatch latch) {
+ this.latch = latch;
+ }
+
+ @Override
+ public void run() {
+ latch.countDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
new file mode 100644
index 0000000..64eb8ea
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.Window.Bound;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link WindowEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class WindowEvaluatorFactoryTest {
+ private static final Instant EPOCH = new Instant(0);
+
+ private PCollection<Long> input;
+ private WindowEvaluatorFactory factory;
+
+ @Mock private InProcessEvaluationContext evaluationContext;
+
+ private BundleFactory bundleFactory;
+
+ private WindowedValue<Long> first =
+ WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L));
+ private WindowedValue<Long> second =
+ WindowedValue.timestampedValueInGlobalWindow(
+ Long.valueOf(1L), EPOCH.plus(Duration.standardDays(3)));
+ private WindowedValue<Long> third =
+ WindowedValue.of(
+ Long.valueOf(2L),
+ new Instant(-10L),
+ new IntervalWindow(new Instant(-100), EPOCH),
+ PaneInfo.NO_FIRING);
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ TestPipeline p = TestPipeline.create();
+ input = p.apply(Create.of(1L, 2L, 3L));
+
+ bundleFactory = InProcessBundleFactory.create();
+ factory = new WindowEvaluatorFactory();
+ }
+
+ @Test
+ public void nullWindowFunSucceeds() throws Exception {
+ Bound<Long> transform =
+ Window.<Long>triggering(
+ AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+ .accumulatingFiredPanes();
+ PCollection<Long> triggering = input.apply(transform);
+
+ CommittedBundle<Long> inputBundle = createInputBundle();
+
+ UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle);
+
+ InProcessTransformResult result = runEvaluator(triggering, inputBundle, transform);
+
+ assertThat(
+ Iterables.getOnlyElement(result.getOutputBundles()),
+ Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+ CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+ assertThat(committed.getElements(), containsInAnyOrder(third, first, second));
+ }
+
+ @Test
+ public void singleWindowFnSucceeds() throws Exception {
+ Duration windowDuration = Duration.standardDays(7);
+ Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
+ PCollection<Long> windowed = input.apply(transform);
+
+ CommittedBundle<Long> inputBundle = createInputBundle();
+
+ UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
+
+ BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
+ BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
+
+ InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform);
+
+ assertThat(
+ Iterables.getOnlyElement(result.getOutputBundles()),
+ Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+ CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+
+ WindowedValue<Long> expectedNewFirst =
+ WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING);
+ WindowedValue<Long> expectedNewSecond =
+ WindowedValue.of(
+ 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING);
+ WindowedValue<Long> expectedNewThird =
+ WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING);
+ assertThat(
+ committed.getElements(),
+ containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird));
+ }
+
+ @Test
+ public void multipleWindowsWindowFnSucceeds() throws Exception {
+ Duration windowDuration = Duration.standardDays(6);
+ Duration slidingBy = Duration.standardDays(3);
+ Bound<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy));
+ PCollection<Long> windowed = input.apply(transform);
+
+ CommittedBundle<Long> inputBundle = createInputBundle();
+ UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
+
+ InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform);
+
+ assertThat(
+ Iterables.getOnlyElement(result.getOutputBundles()),
+ Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+ CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+
+ BoundedWindow w1 = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
+ BoundedWindow w2 =
+ new IntervalWindow(EPOCH.plus(slidingBy), EPOCH.plus(slidingBy).plus(windowDuration));
+ BoundedWindow wMinus1 = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
+ BoundedWindow wMinusSlide =
+ new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy));
+
+ WindowedValue<Long> expectedFirst =
+ WindowedValue.of(
+ first.getValue(),
+ first.getTimestamp(),
+ ImmutableSet.of(w1, wMinusSlide),
+ PaneInfo.NO_FIRING);
+ WindowedValue<Long> expectedSecond =
+ WindowedValue.of(
+ second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING);
+ WindowedValue<Long> expectedThird =
+ WindowedValue.of(
+ third.getValue(),
+ third.getTimestamp(),
+ ImmutableSet.of(wMinus1, wMinusSlide),
+ PaneInfo.NO_FIRING);
+
+ assertThat(
+ committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird));
+ }
+
+ private CommittedBundle<Long> createInputBundle() {
+ CommittedBundle<Long> inputBundle =
+ bundleFactory
+ .createRootBundle(input)
+ .add(first)
+ .add(second)
+ .add(third)
+ .commit(Instant.now());
+ return inputBundle;
+ }
+
+ private UncommittedBundle<Long> createOutputBundle(
+ PCollection<Long> output, CommittedBundle<Long> inputBundle) {
+ UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(inputBundle, output);
+ when(evaluationContext.createBundle(inputBundle, output)).thenReturn(outputBundle);
+ return outputBundle;
+ }
+
+ private InProcessTransformResult runEvaluator(
+ PCollection<Long> windowed,
+ CommittedBundle<Long> inputBundle,
+ Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite */)
+ throws Exception {
+ TransformEvaluator<Long> evaluator =
+ factory.forApplication(
+ AppliedPTransform.of("Window", input, windowed, windowTransform),
+ inputBundle,
+ evaluationContext);
+
+ evaluator.processElement(first);
+ evaluator.processElement(second);
+ evaluator.processElement(third);
+ InProcessTransformResult result = evaluator.finishBundle();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 7b2f356..74812e8 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -35,6 +35,7 @@
<name>Apache Beam :: Runners</name>
<modules>
+ <module>direct-java</module>
<module>flink</module>
<module>spark</module>
</modules>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java
deleted file mode 100644
index fe9c165..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AbstractModelEnforcement.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.sdk.util.WindowedValue;
-
-/**
- * An abstract {@link ModelEnforcement} that provides default empty implementations for each method.
- */
-abstract class AbstractModelEnforcement<T> implements ModelEnforcement<T> {
- @Override
- public void beforeElement(WindowedValue<T> element) {}
-
- @Override
- public void afterElement(WindowedValue<T> element) {}
-
- @Override
- public void afterFinish(
- CommittedBundle<T> input,
- InProcessTransformResult result,
- Iterable<? extends CommittedBundle<?>> outputs) {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
deleted file mode 100644
index e19ffe4..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/AvroIOShardedWriteFactory.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners.inprocess;
-
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-class AvroIOShardedWriteFactory implements PTransformOverrideFactory {
- @Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof AvroIO.Write.Bound) {
- @SuppressWarnings("unchecked")
- AvroIO.Write.Bound<InputT> originalWrite = (AvroIO.Write.Bound<InputT>) transform;
- if (originalWrite.getNumShards() > 1
- || (originalWrite.getNumShards() == 1
- && !"".equals(originalWrite.getShardNameTemplate()))) {
- @SuppressWarnings("unchecked")
- PTransform<InputT, OutputT> override =
- (PTransform<InputT, OutputT>) new AvroIOShardedWrite<InputT>(originalWrite);
- return override;
- }
- }
- return transform;
- }
-
- private class AvroIOShardedWrite<InputT> extends ShardControlledWrite<InputT> {
- private final AvroIO.Write.Bound<InputT> initial;
-
- private AvroIOShardedWrite(AvroIO.Write.Bound<InputT> initial) {
- this.initial = initial;
- }
-
- @Override
- int getNumShards() {
- return initial.getNumShards();
- }
-
- @Override
- PTransform<? super PCollection<InputT>, PDone> getSingleShardTransform(int shardNum) {
- String shardName =
- IOChannelUtils.constructName(
- initial.getFilenamePrefix(),
- initial.getShardNameTemplate(),
- initial.getFilenameSuffix(),
- shardNum,
- getNumShards());
- return initial.withoutSharding().to(shardName).withSuffix("");
- }
-
- @Override
- protected PTransform<PCollection<InputT>, PDone> delegate() {
- return initial;
- }
- }
-}