You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:14 UTC
[06/19] incubator-beam git commit: Rename DoFn to OldDoFn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
index 3b314b2..8b00c03 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnWithContextTest.java
@@ -142,9 +142,9 @@ public class DoFnWithContextTest implements Serializable {
@Test
public void testDoFnWithContextUsingAggregators() {
NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>();
- DoFn<Object, Object>.Context context = noOpFn.context();
+ OldDoFn<Object, Object>.Context context = noOpFn.context();
- DoFn<Object, Object> fn = spy(noOpFn);
+ OldDoFn<Object, Object> fn = spy(noOpFn);
context = spy(context);
@SuppressWarnings("unchecked")
@@ -225,7 +225,7 @@ public class DoFnWithContextTest implements Serializable {
}
/**
- * Initialize a test pipeline with the specified {@link DoFn}.
+ * Initialize a test pipeline with the specified {@link OldDoFn}.
*/
private <InputT, OutputT> TestPipeline createTestPipeline(DoFnWithContext<InputT, OutputT> fn) {
TestPipeline pipeline = TestPipeline.create();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
index 80825cb..b81eedb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlattenTest.java
@@ -130,7 +130,7 @@ public class FlattenTest implements Serializable {
PCollection<String> output = p
.apply(Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply(ParDo.withSideInputs(view).of(new DoFn<Void, String>() {
+ .apply(ParDo.withSideInputs(view).of(new OldDoFn<Void, String>() {
@Override
public void processElement(ProcessContext c) {
for (String side : c.sideInput(view)) {
@@ -339,7 +339,7 @@ public class FlattenTest implements Serializable {
/////////////////////////////////////////////////////////////////////////////
- private static class IdentityFn<T> extends DoFn<T, T> {
+ private static class IdentityFn<T> extends OldDoFn<T, T> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index d6e4589..15c3ba8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.KvMatcher.isKv;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.Matchers.empty;
@@ -55,7 +56,6 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.fasterxml.jackson.annotation.JsonCreator;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
@@ -371,7 +371,7 @@ public class GroupByKeyTest {
pipeline.run();
}
- private static class AssertTimestamp<K, V> extends DoFn<KV<K, V>, Void> {
+ private static class AssertTimestamp<K, V> extends OldDoFn<KV<K, V>, Void> {
private final Instant timestamp;
public AssertTimestamp(Instant timestamp) {
@@ -506,7 +506,7 @@ public class GroupByKeyTest {
* Creates a KV that wraps the original KV together with a random key.
*/
static class AssignRandomKey
- extends DoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
+ extends OldDoFn<KV<BadEqualityKey, Long>, KV<Long, KV<BadEqualityKey, Long>>> {
@Override
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
index 3355aeb..fa2fae9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/IntraBundleParallelizationTest.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.testing.SystemNanoTimeSleeper.sleepMillis;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -74,7 +75,7 @@ public class IntraBundleParallelizationTest {
/**
* Introduces a delay in processing, then passes thru elements.
*/
- private static class DelayFn<T> extends DoFn<T, T> {
+ private static class DelayFn<T> extends OldDoFn<T, T> {
public static final long DELAY_MS = 25;
@Override
@@ -94,7 +95,7 @@ public class IntraBundleParallelizationTest {
/**
* Throws an exception after some number of calls.
*/
- private static class ExceptionThrowingFn<T> extends DoFn<T, T> {
+ private static class ExceptionThrowingFn<T> extends OldDoFn<T, T> {
private ExceptionThrowingFn(int numSuccesses) {
IntraBundleParallelizationTest.numSuccesses.set(numSuccesses);
}
@@ -120,11 +121,11 @@ public class IntraBundleParallelizationTest {
/**
* Measures concurrency of the processElement method.
*/
- private static class ConcurrencyMeasuringFn<T> extends DoFn<T, T> {
+ private static class ConcurrencyMeasuringFn<T> extends OldDoFn<T, T> {
@Override
public void processElement(ProcessContext c) {
// Synchronize on the class to provide synchronous access irrespective of
- // how this DoFn is called.
+ // how this OldDoFn is called.
synchronized (ConcurrencyMeasuringFn.class) {
concurrentElements++;
if (concurrentElements > maxDownstreamConcurrency) {
@@ -154,8 +155,8 @@ public class IntraBundleParallelizationTest {
}
/**
- * Test that the DoFn is parallelized up the the Max Parallelism factor within a bundle, but not
- * greater than that amount.
+ * Test that the OldDoFn is parallelized up the the Max Parallelism factor within a bundle, but
+ * not greater than that amount.
*/
@Test
@Category(NeedsRunner.class)
@@ -224,7 +225,7 @@ public class IntraBundleParallelizationTest {
@Test
public void testDisplayData() {
- DoFn<String, String> fn = new DoFn<String, String>() {
+ OldDoFn<String, String> fn = new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
}
@@ -248,15 +249,15 @@ public class IntraBundleParallelizationTest {
/**
* Runs the provided doFn inside of an {@link IntraBundleParallelization} transform.
*
- * <p>This method assumes that the DoFn passed to it will call {@link #startConcurrentCall()}
+ * <p>This method assumes that the OldDoFn passed to it will call {@link #startConcurrentCall()}
* before processing each elements and {@link #finishConcurrentCall()} after each element.
*
* @param numElements the size of the input
* @param maxParallelism how many threads to execute in parallel
- * @param doFn the DoFn to execute
- * @return the maximum observed parallelism of the DoFn
+ * @param doFn the OldDoFn to execute
+ * @return the maximum observed parallelism of the OldDoFn
*/
- private int run(int numElements, int maxParallelism, DoFn<Integer, Integer> doFn) {
+ private int run(int numElements, int maxParallelism, OldDoFn<Integer, Integer> doFn) {
Pipeline pipeline = TestPipeline.create();
ArrayList<Integer> data = new ArrayList<>(numElements);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index f18504c..b4751d2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
index 226255a..87fa554 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MaxTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
index d7ec322..cd03a74 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MinTest.java
@@ -20,10 +20,12 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import org.apache.beam.sdk.transforms.display.DisplayData;
+
import com.google.common.collect.Lists;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
index a389fac..5c43755 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpDoFn.java
@@ -28,35 +28,35 @@ import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
/**
- * A {@link DoFn} that does nothing with provided elements. Used for testing
- * methods provided by the DoFn abstract class.
+ * A {@link OldDoFn} that does nothing with provided elements. Used for testing
+ * methods provided by the OldDoFn abstract class.
*
* @param <InputT> unused.
* @param <OutputT> unused.
*/
-class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
+class NoOpDoFn<InputT, OutputT> extends OldDoFn<InputT, OutputT> {
@Override
- public void processElement(DoFn<InputT, OutputT>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<InputT, OutputT>.ProcessContext c) throws Exception {
}
/**
* Returns a new NoOp Context.
*/
- public DoFn<InputT, OutputT>.Context context() {
+ public OldDoFn<InputT, OutputT>.Context context() {
return new NoOpDoFnContext();
}
/**
* Returns a new NoOp Process Context.
*/
- public DoFn<InputT, OutputT>.ProcessContext processContext() {
+ public OldDoFn<InputT, OutputT>.ProcessContext processContext() {
return new NoOpDoFnProcessContext();
}
/**
- * A {@link DoFn.Context} that does nothing and returns exclusively null.
+ * A {@link OldDoFn.Context} that does nothing and returns exclusively null.
*/
- private class NoOpDoFnContext extends DoFn<InputT, OutputT>.Context {
+ private class NoOpDoFnContext extends OldDoFn<InputT, OutputT>.Context {
@Override
public PipelineOptions getPipelineOptions() {
return null;
@@ -82,10 +82,10 @@ class NoOpDoFn<InputT, OutputT> extends DoFn<InputT, OutputT> {
}
/**
- * A {@link DoFn.ProcessContext} that does nothing and returns exclusively
+ * A {@link OldDoFn.ProcessContext} that does nothing and returns exclusively
* null.
*/
- private class NoOpDoFnProcessContext extends DoFn<InputT, OutputT>.ProcessContext {
+ private class NoOpDoFnProcessContext extends OldDoFn<InputT, OutputT>.ProcessContext {
@Override
public InputT element() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
new file mode 100644
index 0000000..9234ccb
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link OldDoFn.Context}.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnContextTest {
+
+ @Mock
+ private Aggregator<Long, Long> agg;
+
+ private OldDoFn<Object, Object> fn;
+ private OldDoFn<Object, Object>.Context context;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+
+ // Need to be real objects to call the constructor, and to reference the
+ // outer instance of OldDoFn
+ NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>();
+ OldDoFn<Object, Object>.Context noOpContext = noOpFn.context();
+
+ fn = spy(noOpFn);
+ context = spy(noOpContext);
+ }
+
+ @Test
+ public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
+ Sum.SumLongFn combiner = new Sum.SumLongFn();
+ Aggregator<Long, Long> delegateAggregator =
+ fn.createAggregator("test", combiner);
+
+ when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
+
+ context.setupDelegateAggregators();
+ delegateAggregator.addValue(1L);
+
+ verify(agg).addValue(1L);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
new file mode 100644
index 0000000..49f4366
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
+import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Tests for OldDoFn.
+ */
+@RunWith(JUnit4.class)
+public class OldDoFnTest implements Serializable {
+
+ @Rule
+ public transient ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testCreateAggregatorWithCombinerSucceeds() {
+ String name = "testAggregator";
+ Sum.SumLongFn combiner = new Sum.SumLongFn();
+
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+ Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
+
+ assertEquals(name, aggregator.getName());
+ assertEquals(combiner, aggregator.getCombineFn());
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullNameThrowsException() {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("name cannot be null");
+
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+ doFn.createAggregator(null, new Sum.SumLongFn());
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullCombineFnThrowsException() {
+ CombineFn<Object, Object, Object> combiner = null;
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("combiner cannot be null");
+
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+ doFn.createAggregator("testAggregator", combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorWithNullSerializableFnThrowsException() {
+ SerializableFunction<Iterable<Object>, Object> combiner = null;
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("combiner cannot be null");
+
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+ doFn.createAggregator("testAggregator", combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorWithSameNameThrowsException() {
+ String name = "testAggregator";
+ CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+ doFn.createAggregator(name, combiner);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Cannot create");
+ thrown.expectMessage(name);
+ thrown.expectMessage("already exists");
+
+ doFn.createAggregator(name, combiner);
+ }
+
+ @Test
+ public void testCreateAggregatorsWithDifferentNamesSucceeds() {
+ String nameOne = "testAggregator";
+ String nameTwo = "aggregatorPrime";
+ CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
+
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
+
+ Aggregator<Double, Double> aggregatorOne =
+ doFn.createAggregator(nameOne, combiner);
+ Aggregator<Double, Double> aggregatorTwo =
+ doFn.createAggregator(nameTwo, combiner);
+
+ assertNotEquals(aggregatorOne, aggregatorTwo);
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCreateAggregatorInStartBundleThrows() {
+ TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+ @Override
+ public void startBundle(OldDoFn<String, String>.Context c) throws Exception {
+ createAggregator("anyAggregate", new MaxIntegerFn());
+ }
+
+ @Override
+ public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
+ });
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCreateAggregatorInProcessElementThrows() {
+ TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ createAggregator("anyAggregate", new MaxIntegerFn());
+ }
+ });
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testCreateAggregatorInFinishBundleThrows() {
+ TestPipeline p = createTestPipeline(new OldDoFn<String, String>() {
+ @Override
+ public void finishBundle(OldDoFn<String, String>.Context c) throws Exception {
+ createAggregator("anyAggregate", new MaxIntegerFn());
+ }
+
+ @Override
+ public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {}
+ });
+
+ thrown.expect(PipelineExecutionException.class);
+ thrown.expectCause(isA(IllegalStateException.class));
+
+ p.run();
+ }
+
+ /**
+ * Initialize a test pipeline with the specified {@link OldDoFn}.
+ */
+ private <InputT, OutputT> TestPipeline createTestPipeline(OldDoFn<InputT, OutputT> fn) {
+ TestPipeline pipeline = TestPipeline.create();
+ pipeline.apply(Create.of((InputT) null))
+ .apply(ParDo.of(fn));
+
+ return pipeline;
+ }
+
+ @Test
+ public void testPopulateDisplayDataDefaultBehavior() {
+ OldDoFn<String, String> usesDefault =
+ new OldDoFn<String, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {}
+ };
+
+ DisplayData data = DisplayData.from(usesDefault);
+ assertThat(data.items(), empty());
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testAggregators() throws Exception {
+ Pipeline pipeline = TestPipeline.create();
+
+ CountOddsFn countOdds = new CountOddsFn();
+ pipeline
+ .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
+ .apply(ParDo.of(countOdds));
+ PipelineResult result = pipeline.run();
+
+ AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
+ assertThat(values.getValuesAtSteps(),
+ equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4)));
+ }
+
+ private static class CountOddsFn extends OldDoFn<Integer, Void> {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ if (c.element() % 2 == 1) {
+ aggregator.addValue(1);
+ }
+ }
+
+ Aggregator<Integer, Integer> aggregator =
+ createAggregator("odds", new SumIntegerFn());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 868270c..0a6eab0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -26,6 +26,7 @@ import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
import static com.google.common.base.Preconditions.checkNotNull;
+
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
@@ -43,7 +44,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo.Bound;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -59,7 +60,6 @@ import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import com.fasterxml.jackson.annotation.JsonCreator;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
@@ -89,7 +89,9 @@ public class ParDoTest implements Serializable {
@Rule
public transient ExpectedException thrown = ExpectedException.none();
- private static class PrintingDoFn extends DoFn<String, String> implements RequiresWindowAccess {
+ private static class PrintingOldDoFn extends OldDoFn<String, String> implements
+ RequiresWindowAccess {
+
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + ":" + c.timestamp().getMillis()
@@ -97,17 +99,17 @@ public class ParDoTest implements Serializable {
}
}
- static class TestDoFn extends DoFn<Integer, String> {
+ static class TestOldDoFn extends OldDoFn<Integer, String> {
enum State { UNSTARTED, STARTED, PROCESSING, FINISHED }
State state = State.UNSTARTED;
final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>();
final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>();
- public TestDoFn() {
+ public TestOldDoFn() {
}
- public TestDoFn(List<PCollectionView<Integer>> sideInputViews,
+ public TestOldDoFn(List<PCollectionView<Integer>> sideInputViews,
List<TupleTag<String>> sideOutputTupleTags) {
this.sideInputViews.addAll(sideInputViews);
this.sideOutputTupleTags.addAll(sideOutputTupleTags);
@@ -161,9 +163,9 @@ public class ParDoTest implements Serializable {
}
}
- static class TestNoOutputDoFn extends DoFn<Integer, String> {
+ static class TestNoOutputDoFn extends OldDoFn<Integer, String> {
@Override
- public void processElement(DoFn<Integer, String>.ProcessContext c) throws Exception {}
+ public void processElement(OldDoFn<Integer, String>.ProcessContext c) throws Exception {}
}
static class TestDoFnWithContext extends DoFnWithContext<Integer, String> {
@@ -229,7 +231,7 @@ public class ParDoTest implements Serializable {
}
}
- static class TestStartBatchErrorDoFn extends DoFn<Integer, String> {
+ static class TestStartBatchErrorDoFn extends OldDoFn<Integer, String> {
@Override
public void startBundle(Context c) {
throw new RuntimeException("test error in initialize");
@@ -241,14 +243,14 @@ public class ParDoTest implements Serializable {
}
}
- static class TestProcessElementErrorDoFn extends DoFn<Integer, String> {
+ static class TestProcessElementErrorDoFn extends OldDoFn<Integer, String> {
@Override
public void processElement(ProcessContext c) {
throw new RuntimeException("test error in process");
}
}
- static class TestFinishBatchErrorDoFn extends DoFn<Integer, String> {
+ static class TestFinishBatchErrorDoFn extends OldDoFn<Integer, String> {
@Override
public void processElement(ProcessContext c) {
// This has to be here.
@@ -260,13 +262,13 @@ public class ParDoTest implements Serializable {
}
}
- private static class StrangelyNamedDoer extends DoFn<Integer, String> {
+ private static class StrangelyNamedDoer extends OldDoFn<Integer, String> {
@Override
public void processElement(ProcessContext c) {
}
}
- static class TestOutputTimestampDoFn extends DoFn<Integer, Integer> {
+ static class TestOutputTimestampDoFn extends OldDoFn<Integer, Integer> {
@Override
public void processElement(ProcessContext c) {
Integer value = c.element();
@@ -274,7 +276,7 @@ public class ParDoTest implements Serializable {
}
}
- static class TestShiftTimestampDoFn extends DoFn<Integer, Integer> {
+ static class TestShiftTimestampDoFn extends OldDoFn<Integer, Integer> {
private Duration allowedTimestampSkew;
private Duration durationToShift;
@@ -297,7 +299,7 @@ public class ParDoTest implements Serializable {
}
}
- static class TestFormatTimestampDoFn extends DoFn<Integer, String> {
+ static class TestFormatTimestampDoFn extends OldDoFn<Integer, String> {
@Override
public void processElement(ProcessContext c) {
checkNotNull(c.timestamp());
@@ -318,7 +320,7 @@ public class ParDoTest implements Serializable {
return PCollectionTuple.of(BY2, by2).and(BY3, by3);
}
- static class FilterFn extends DoFn<Integer, Integer> {
+ static class FilterFn extends OldDoFn<Integer, Integer> {
private final int divisor;
FilterFn(int divisor) {
@@ -343,7 +345,7 @@ public class ParDoTest implements Serializable {
PCollection<String> output = pipeline
.apply(Create.of(inputs))
- .apply(ParDo.of(new TestDoFn()));
+ .apply(ParDo.of(new TestOldDoFn()));
PAssert.that(output)
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
@@ -377,7 +379,7 @@ public class ParDoTest implements Serializable {
PCollection<String> output = pipeline
.apply(Create.of(inputs).withCoder(VarIntCoder.of()))
- .apply("TestDoFn", ParDo.of(new TestDoFn()));
+ .apply("TestOldDoFn", ParDo.of(new TestOldDoFn()));
PAssert.that(output)
.satisfies(ParDoTest.HasExpectedOutput.forInput(inputs));
@@ -395,7 +397,7 @@ public class ParDoTest implements Serializable {
PCollection<String> output = pipeline
.apply(Create.of(inputs).withCoder(VarIntCoder.of()))
- .apply("TestDoFn", ParDo.of(new TestNoOutputDoFn()));
+ .apply("TestOldDoFn", ParDo.of(new TestNoOutputDoFn()));
PAssert.that(output).empty();
@@ -418,7 +420,7 @@ public class ParDoTest implements Serializable {
PCollectionTuple outputs = pipeline
.apply(Create.of(inputs))
.apply(ParDo
- .of(new TestDoFn(
+ .of(new TestOldDoFn(
Arrays.<PCollectionView<Integer>>asList(),
Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
.withOutputTags(
@@ -461,7 +463,7 @@ public class ParDoTest implements Serializable {
PCollectionTuple outputs = pipeline
.apply(Create.of(inputs))
.apply(ParDo
- .of(new TestDoFn(
+ .of(new TestOldDoFn(
Arrays.<PCollectionView<Integer>>asList(),
Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
.withOutputTags(
@@ -527,7 +529,7 @@ public class ParDoTest implements Serializable {
PCollectionTuple outputs = pipeline
.apply(Create.of(inputs))
.apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
- .of(new DoFn<Integer, Void>(){
+ .of(new OldDoFn<Integer, Void>(){
@Override
public void processElement(ProcessContext c) {
c.sideOutput(sideOutputTag, c.element());
@@ -550,7 +552,7 @@ public class ParDoTest implements Serializable {
PCollection<String> output = pipeline
.apply(Create.of(inputs))
- .apply(ParDo.of(new TestDoFn(
+ .apply(ParDo.of(new TestOldDoFn(
Arrays.<PCollectionView<Integer>>asList(),
Arrays.asList(sideTag))));
@@ -569,7 +571,7 @@ public class ParDoTest implements Serializable {
// Success for a total of 1000 outputs.
input
- .apply("Success1000", ParDo.of(new DoFn<Integer, String>() {
+ .apply("Success1000", ParDo.of(new OldDoFn<Integer, String>() {
@Override
public void processElement(ProcessContext c) {
TupleTag<String> specialSideTag = new TupleTag<String>(){};
@@ -585,7 +587,7 @@ public class ParDoTest implements Serializable {
// Failure for a total of 1001 outputs.
input
- .apply("Failure1001", ParDo.of(new DoFn<Integer, String>() {
+ .apply("Failure1001", ParDo.of(new OldDoFn<Integer, String>() {
@Override
public void processElement(ProcessContext c) {
for (int i = 0; i < 1000; i++) {
@@ -618,7 +620,7 @@ public class ParDoTest implements Serializable {
PCollection<String> output = pipeline
.apply(Create.of(inputs))
.apply(ParDo.withSideInputs(sideInput1, sideInputUnread, sideInput2)
- .of(new TestDoFn(
+ .of(new TestOldDoFn(
Arrays.asList(sideInput1, sideInput2),
Arrays.<TupleTag<String>>asList())));
@@ -652,7 +654,7 @@ public class ParDoTest implements Serializable {
.apply(ParDo.withSideInputs(sideInput1)
.withSideInputs(sideInputUnread)
.withSideInputs(sideInput2)
- .of(new TestDoFn(
+ .of(new TestOldDoFn(
Arrays.asList(sideInput1, sideInput2),
Arrays.<TupleTag<String>>asList())));
@@ -690,7 +692,7 @@ public class ParDoTest implements Serializable {
.withSideInputs(sideInputUnread)
.withSideInputs(sideInput2)
.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
- .of(new TestDoFn(
+ .of(new TestOldDoFn(
Arrays.asList(sideInput1, sideInput2),
Arrays.<TupleTag<String>>asList())));
@@ -728,7 +730,7 @@ public class ParDoTest implements Serializable {
.withSideInputs(sideInputUnread)
.withSideInputs(sideInput2)
.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
- .of(new TestDoFn(
+ .of(new TestOldDoFn(
Arrays.asList(sideInput1, sideInput2),
Arrays.<TupleTag<String>>asList())));
@@ -752,7 +754,7 @@ public class ParDoTest implements Serializable {
.apply(View.<Integer>asSingleton());
pipeline.apply("CreateMain", Create.of(inputs))
- .apply(ParDo.of(new TestDoFn(
+ .apply(ParDo.of(new TestOldDoFn(
Arrays.<PCollectionView<Integer>>asList(sideView),
Arrays.<TupleTag<String>>asList())));
@@ -815,18 +817,18 @@ public class ParDoTest implements Serializable {
.setName("MyInput");
{
- PCollection<String> output1 = input.apply(ParDo.of(new TestDoFn()));
+ PCollection<String> output1 = input.apply(ParDo.of(new TestOldDoFn()));
assertEquals("ParDo(Test).out", output1.getName());
}
{
- PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestDoFn()));
+ PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestOldDoFn()));
assertEquals("MyParDo.out", output2.getName());
}
{
- PCollection<String> output4 = input.apply("TestDoFn", ParDo.of(new TestDoFn()));
- assertEquals("TestDoFn.out", output4.getName());
+ PCollection<String> output4 = input.apply("TestOldDoFn", ParDo.of(new TestOldDoFn()));
+ assertEquals("TestOldDoFn.out", output4.getName());
}
{
@@ -835,7 +837,7 @@ public class ParDoTest implements Serializable {
output5.getName());
}
- assertEquals("ParDo(Printing)", ParDo.of(new PrintingDoFn()).getName());
+ assertEquals("ParDo(Printing)", ParDo.of(new PrintingOldDoFn()).getName());
assertEquals(
"ParMultiDo(SideOutputDummy)",
@@ -855,7 +857,7 @@ public class ParDoTest implements Serializable {
PCollectionTuple outputs = p
.apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput")
.apply("MyParDo", ParDo
- .of(new TestDoFn(
+ .of(new TestOldDoFn(
Arrays.<PCollectionView<Integer>>asList(),
Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3)))
.withOutputTags(
@@ -883,7 +885,7 @@ public class ParDoTest implements Serializable {
.apply("CustomTransform", new PTransform<PCollection<Integer>, PCollection<String>>() {
@Override
public PCollection<String> apply(PCollection<Integer> input) {
- return input.apply(ParDo.of(new TestDoFn()));
+ return input.apply(ParDo.of(new TestOldDoFn()));
}
});
@@ -920,7 +922,7 @@ public class ParDoTest implements Serializable {
@Test
public void testJsonEscaping() {
// Declare an arbitrary function and make sure we can serialize it
- DoFn<Integer, Integer> doFn = new DoFn<Integer, Integer>() {
+ OldDoFn<Integer, Integer> doFn = new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + 1);
@@ -973,7 +975,7 @@ public class ParDoTest implements Serializable {
}
}
- private static class SideOutputDummyFn extends DoFn<Integer, Integer> {
+ private static class SideOutputDummyFn extends OldDoFn<Integer, Integer> {
private TupleTag<TestDummy> sideTag;
public SideOutputDummyFn(TupleTag<TestDummy> sideTag) {
this.sideTag = sideTag;
@@ -985,7 +987,7 @@ public class ParDoTest implements Serializable {
}
}
- private static class MainOutputDummyFn extends DoFn<Integer, TestDummy> {
+ private static class MainOutputDummyFn extends OldDoFn<Integer, TestDummy> {
private TupleTag<Integer> sideTag;
public MainOutputDummyFn(TupleTag<Integer> sideTag) {
this.sideTag = sideTag;
@@ -1167,7 +1169,7 @@ public class ParDoTest implements Serializable {
.apply(ParDo
.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag))
.of(
- new DoFn<TestDummy, TestDummy>() {
+ new OldDoFn<TestDummy, TestDummy>() {
@Override public void processElement(ProcessContext context) {
TestDummy element = context.element();
context.output(element);
@@ -1181,7 +1183,7 @@ public class ParDoTest implements Serializable {
// on a missing coder.
tuple.get(mainOutputTag)
.setCoder(TestDummyCoder.of())
- .apply("Output1", ParDo.of(new DoFn<TestDummy, Integer>() {
+ .apply("Output1", ParDo.of(new OldDoFn<TestDummy, Integer>() {
@Override public void processElement(ProcessContext context) {
context.output(1);
}
@@ -1228,7 +1230,7 @@ public class ParDoTest implements Serializable {
PCollection<String> output =
input
.apply(ParDo.withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)).of(
- new DoFn<Integer, Integer>() {
+ new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.sideOutputWithTimestamp(
@@ -1349,7 +1351,7 @@ public class ParDoTest implements Serializable {
PCollection<String> output = pipeline
.apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
.apply(Window.<String>into(FixedWindows.of(Duration.millis(1))))
- .apply(ParDo.of(new DoFn<String, String>() {
+ .apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void startBundle(Context c) {
c.outputWithTimestamp("start", new Instant(2));
@@ -1368,7 +1370,7 @@ public class ParDoTest implements Serializable {
System.out.println("Finish: 3");
}
}))
- .apply(ParDo.of(new PrintingDoFn()));
+ .apply(ParDo.of(new PrintingOldDoFn()));
PAssert.that(output).satisfies(new Checker());
@@ -1383,7 +1385,7 @@ public class ParDoTest implements Serializable {
pipeline
.apply(Create.timestamped(TimestampedValue.of("elem", new Instant(1))))
.apply(Window.<String>into(FixedWindows.of(Duration.millis(1))))
- .apply(ParDo.of(new DoFn<String, String>() {
+ .apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void startBundle(Context c) {
c.output("start");
@@ -1400,7 +1402,7 @@ public class ParDoTest implements Serializable {
}
@Test
public void testDoFnDisplayData() {
- DoFn<String, String> fn = new DoFn<String, String>() {
+ OldDoFn<String, String> fn = new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
index 243b52b..0cc804e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/PartitionTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
index fe02573..e7f8cd0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.NO_LINES;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static com.google.common.base.Preconditions.checkArgument;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
index a96d19b..fc0e659 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
index 738b492..ee240bf 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
+
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -98,12 +99,13 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("Create123", Create.of(1, 2, 3))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.sideInput(view));
+ }
+ }));
PAssert.that(output).containsInAnyOrder(47, 47, 47);
@@ -124,16 +126,17 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("Create123", Create.timestamped(
- TimestampedValue.of(1, new Instant(4)),
- TimestampedValue.of(2, new Instant(8)),
- TimestampedValue.of(3, new Instant(12))))
+ TimestampedValue.of(1, new Instant(4)),
+ TimestampedValue.of(2, new Instant(8)),
+ TimestampedValue.of(3, new Instant(12))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.sideInput(view));
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(c.sideInput(view));
+ }
+ }));
PAssert.that(output).containsInAnyOrder(47, 47, 48);
@@ -150,7 +153,7 @@ public class ViewTest implements Serializable {
.apply(View.<Integer>asSingleton());
pipeline.apply("Create123", Create.of(1, 2, 3))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
@@ -175,7 +178,7 @@ public class ViewTest implements Serializable {
final PCollectionView<Integer> view = oneTwoThree.apply(View.<Integer>asSingleton());
oneTwoThree.apply(
- "OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
+ "OutputSideInputs", ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
@@ -201,16 +204,17 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29, 31))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- checkArgument(c.sideInput(view).size() == 4);
- checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ checkArgument(c.sideInput(view).size() == 4);
+ checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
+ for (Integer i : c.sideInput(view)) {
+ c.output(i);
+ }
+ }
+ }));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
@@ -237,19 +241,21 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.timestamped(
- TimestampedValue.of(29, new Instant(1)),
- TimestampedValue.of(35, new Instant(11))))
+ TimestampedValue.of(29, new Instant(1)),
+ TimestampedValue.of(35, new Instant(11))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- checkArgument(c.sideInput(view).size() == 4);
- checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- }));
+ .apply(
+ "OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ checkArgument(c.sideInput(view).size() == 4);
+ checkArgument(c.sideInput(view).get(0) == c.sideInput(view).get(0));
+ for (Integer i : c.sideInput(view)) {
+ c.output(i);
+ }
+ }
+ }));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
@@ -267,16 +273,17 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertFalse(c.sideInput(view).iterator().hasNext());
- c.output(1);
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ assertTrue(c.sideInput(view).isEmpty());
+ assertFalse(c.sideInput(view).iterator().hasNext());
+ c.output(1);
+ }
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -292,36 +299,37 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- try {
- c.sideInput(view).clear();
- fail("Expected UnsupportedOperationException on clear()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).add(4);
- fail("Expected UnsupportedOperationException on add()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).addAll(new ArrayList<Integer>());
- fail("Expected UnsupportedOperationException on addAll()");
- } catch (UnsupportedOperationException expected) {
- }
- try {
- c.sideInput(view).remove(0);
- fail("Expected UnsupportedOperationException on remove()");
- } catch (UnsupportedOperationException expected) {
- }
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ try {
+ c.sideInput(view).clear();
+ fail("Expected UnsupportedOperationException on clear()");
+ } catch (UnsupportedOperationException expected) {
+ }
+ try {
+ c.sideInput(view).add(4);
+ fail("Expected UnsupportedOperationException on add()");
+ } catch (UnsupportedOperationException expected) {
+ }
+ try {
+ c.sideInput(view).addAll(new ArrayList<Integer>());
+ fail("Expected UnsupportedOperationException on addAll()");
+ } catch (UnsupportedOperationException expected) {
+ }
+ try {
+ c.sideInput(view).remove(0);
+ fail("Expected UnsupportedOperationException on remove()");
+ } catch (UnsupportedOperationException expected) {
+ }
+ for (Integer i : c.sideInput(view)) {
+ c.output(i);
+ }
+ }
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(output).containsInAnyOrder(11);
pipeline.run();
@@ -338,14 +346,15 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29, 31))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ for (Integer i : c.sideInput(view)) {
+ c.output(i);
+ }
+ }
+ }));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 11, 13, 17, 23);
@@ -371,18 +380,21 @@ public class ViewTest implements Serializable {
.apply(View.<Integer>asIterable());
PCollection<Integer> output =
- pipeline.apply("CreateMainInput", Create.timestamped(
- TimestampedValue.of(29, new Instant(1)),
- TimestampedValue.of(35, new Instant(11))))
+ pipeline
+ .apply("CreateMainInput",
+ Create.timestamped(
+ TimestampedValue.of(29, new Instant(1)),
+ TimestampedValue.of(35, new Instant(11))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- for (Integer i : c.sideInput(view)) {
- c.output(i);
- }
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ for (Integer i : c.sideInput(view)) {
+ c.output(i);
+ }
+ }
+ }));
PAssert.that(output).containsInAnyOrder(11, 13, 17, 23, 31, 33, 37, 43);
@@ -400,15 +412,16 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- assertFalse(c.sideInput(view).iterator().hasNext());
- c.output(1);
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ assertFalse(c.sideInput(view).iterator().hasNext());
+ c.output(1);
+ }
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -424,22 +437,23 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateMainInput", Create.of(29))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- Iterator<Integer> iterator = c.sideInput(view).iterator();
- while (iterator.hasNext()) {
- try {
- iterator.remove();
- fail("Expected UnsupportedOperationException on remove()");
- } catch (UnsupportedOperationException expected) {
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ Iterator<Integer> iterator = c.sideInput(view).iterator();
+ while (iterator.hasNext()) {
+ try {
+ iterator.remove();
+ fail("Expected UnsupportedOperationException on remove()");
+ } catch (UnsupportedOperationException expected) {
+ }
+ c.output(iterator.next());
+ }
}
- c.output(iterator.next());
- }
- }
- }));
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(output).containsInAnyOrder(11);
pipeline.run();
@@ -458,7 +472,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
@@ -486,7 +500,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
@@ -540,7 +554,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
for (Integer v : c.sideInput(view).get(c.element().substring(0, 1))) {
@@ -577,7 +591,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
for (Integer v :
@@ -615,7 +629,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of(1 /* size */, new Instant(16))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<Integer, KV<String, Integer>>() {
+ new OldDoFn<Integer, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
assertEquals((int) c.element(),
@@ -660,7 +674,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
for (Integer v :
@@ -689,17 +703,18 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
- assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ assertTrue(c.sideInput(view).isEmpty());
+ assertTrue(c.sideInput(view).entrySet().isEmpty());
+ assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+ c.output(c.element());
+ }
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -718,17 +733,18 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
- assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ assertTrue(c.sideInput(view).isEmpty());
+ assertTrue(c.sideInput(view).entrySet().isEmpty());
+ assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+ c.output(c.element());
+ }
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -747,7 +763,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
try {
@@ -776,7 +792,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
pipeline.run();
@@ -795,7 +811,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(
@@ -822,7 +838,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of(2 /* size */))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<Integer, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
assertEquals((int) c.element(), c.sideInput(view).size());
@@ -854,7 +870,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(
@@ -890,7 +906,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(
@@ -927,7 +943,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of(1 /* size */, new Instant(16))))
.apply("MainWindowInto", Window.<Integer>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<Integer, KV<String, Integer>>() {
+ new OldDoFn<Integer, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
assertEquals((int) c.element(),
@@ -972,7 +988,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of("blackberry", new Instant(16))))
.apply("MainWindowInto", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<String, KV<String, Integer>>() {
+ new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(
@@ -1000,17 +1016,18 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
- assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ assertTrue(c.sideInput(view).isEmpty());
+ assertTrue(c.sideInput(view).entrySet().isEmpty());
+ assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+ c.output(c.element());
+ }
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -1028,17 +1045,18 @@ public class ViewTest implements Serializable {
PCollection<Integer> results =
pipeline.apply("Create1", Create.of(1))
- .apply("OutputSideInputs", ParDo.withSideInputs(view).of(new DoFn<Integer, Integer>() {
- @Override
- public void processElement(ProcessContext c) {
- assertTrue(c.sideInput(view).isEmpty());
- assertTrue(c.sideInput(view).entrySet().isEmpty());
- assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
- c.output(c.element());
- }
- }));
+ .apply("OutputSideInputs",
+ ParDo.withSideInputs(view).of(new OldDoFn<Integer, Integer>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ assertTrue(c.sideInput(view).isEmpty());
+ assertTrue(c.sideInput(view).entrySet().isEmpty());
+ assertFalse(c.sideInput(view).entrySet().iterator().hasNext());
+ c.output(c.element());
+ }
+ }));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(results).containsInAnyOrder(1);
pipeline.run();
@@ -1062,7 +1080,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(
@@ -1093,7 +1111,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateMainInput", Create.of("apple"))
.apply(
"OutputSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
@Override
public void processElement(ProcessContext c) {
try {
@@ -1121,7 +1139,7 @@ public class ViewTest implements Serializable {
}
}));
- // Pass at least one value through to guarantee that DoFn executes.
+ // Pass at least one value through to guarantee that OldDoFn executes.
PAssert.that(output).containsInAnyOrder(KV.of("apple", 1));
pipeline.run();
@@ -1139,12 +1157,14 @@ public class ViewTest implements Serializable {
PCollection<KV<String, Integer>> output =
pipeline.apply("CreateMainInput", Create.of("apple", "banana", "blackberry"))
- .apply("Output", ParDo.withSideInputs(view).of(new DoFn<String, KV<String, Integer>>() {
- @Override
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
- }
- }));
+ .apply("Output",
+ ParDo.withSideInputs(view).of(new OldDoFn<String, KV<String, Integer>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ c.output(KV
+ .of(c.element(), c.sideInput(view).get(c.element().substring(0, 1))));
+ }
+ }));
PAssert.that(output).containsInAnyOrder(
KV.of("apple", 21), KV.of("banana", 3), KV.of("blackberry", 3));
@@ -1173,7 +1193,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
@@ -1206,7 +1226,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
@@ -1237,7 +1257,7 @@ public class ViewTest implements Serializable {
TimestampedValue.of("C", new Instant(7))))
.apply("WindowMainInput", Window.<String>into(FixedWindows.of(Duration.millis(10))))
.apply("OutputMainAndSideInputs", ParDo.withSideInputs(view).of(
- new DoFn<String, String>() {
+ new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
@@ -1267,7 +1287,7 @@ public class ViewTest implements Serializable {
p.apply("CreateMainInput", Create.of(""))
.apply(
"OutputMainAndSideInputs",
- ParDo.withSideInputs(view).of(new DoFn<String, String>() {
+ ParDo.withSideInputs(view).of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + c.sideInput(view));
@@ -1285,7 +1305,7 @@ public class ViewTest implements Serializable {
Pipeline pipeline = TestPipeline.create();
final PCollectionView<Iterable<Integer>> view1 =
pipeline.apply("CreateVoid1", Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply("OutputOneInteger", ParDo.of(new DoFn<Void, Integer>() {
+ .apply("OutputOneInteger", ParDo.of(new OldDoFn<Void, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.output(17);
@@ -1297,7 +1317,7 @@ public class ViewTest implements Serializable {
pipeline.apply("CreateVoid2", Create.of((Void) null).withCoder(VoidCoder.of()))
.apply(
"OutputSideInput",
- ParDo.withSideInputs(view1).of(new DoFn<Void, Iterable<Integer>>() {
+ ParDo.withSideInputs(view1).of(new OldDoFn<Void, Iterable<Integer>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(view1));
@@ -1307,8 +1327,8 @@ public class ViewTest implements Serializable {
PCollection<Integer> output =
pipeline.apply("CreateVoid3", Create.of((Void) null).withCoder(VoidCoder.of()))
- .apply(
- "ReadIterableSideInput", ParDo.withSideInputs(view2).of(new DoFn<Void, Integer>() {
+ .apply("ReadIterableSideInput",
+ ParDo.withSideInputs(view2).of(new OldDoFn<Void, Integer>() {
@Override
public void processElement(ProcessContext c) {
for (Iterable<Integer> input : c.sideInput(view2)) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
index ac67bb4..d2ba452 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsTest.java
@@ -65,9 +65,9 @@ public class WithTimestampsTest implements Serializable {
.apply(WithTimestamps.of(timestampFn));
PCollection<KV<String, Instant>> timestampedVals =
- timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
@Override
- public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
+ public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
@@ -150,9 +150,9 @@ public class WithTimestampsTest implements Serializable {
WithTimestamps.of(backInTimeFn).withAllowedTimestampSkew(skew.plus(100L)));
PCollection<KV<String, Instant>> timestampedVals =
- timestampedWithSkew.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ timestampedWithSkew.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
@Override
- public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
+ public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
index ce32b7d..c1848c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataEvaluatorTest.java
@@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
@@ -50,7 +50,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
new PTransform<PCollection<String>, POutput> () {
@Override
public PCollection<String> apply(PCollection<String> input) {
- return input.apply(ParDo.of(new DoFn<String, String>() {
+ return input.apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
@@ -79,7 +79,7 @@ public class DisplayDataEvaluatorTest implements Serializable {
@Test
public void testPrimitiveTransform() {
PTransform<? super PCollection<Integer>, ? super PCollection<Integer>> myTransform = ParDo.of(
- new DoFn<Integer, Integer>() {
+ new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
index 07029e9..fa44390 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasName
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;