You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:13 UTC
[05/19] incubator-beam git commit: Rename DoFn to OldDoFn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
index cafe873..517f968 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/display/DisplayDataTest.java
@@ -24,6 +24,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasName
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
@@ -40,7 +41,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
@@ -54,7 +55,6 @@ import com.google.common.testing.EqualsTester;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.hamcrest.CustomTypeSafeMatcher;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
@@ -1053,7 +1053,7 @@ public class DisplayDataTest implements Serializable {
private static class IdentityTransform<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> apply(PCollection<T> input) {
- return input.apply(ParDo.of(new DoFn<T, T>() {
+ return input.apply(ParDo.of(new OldDoFn<T, T>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 10a2a7e..97667a3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -29,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -85,8 +85,8 @@ public class CoGroupByKeyTest implements Serializable {
.withCoder(KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of())));
}
return input
- .apply("Identity" + name, ParDo.of(new DoFn<KV<Integer, String>,
- KV<Integer, String>>() {
+ .apply("Identity" + name, ParDo.of(new OldDoFn<KV<Integer, String>,
+ KV<Integer, String>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element());
@@ -313,11 +313,11 @@ public class CoGroupByKeyTest implements Serializable {
}
/**
- * A DoFn used in testCoGroupByKeyWithWindowing(), to test processing the
+ * A OldDoFn used in testCoGroupByKeyWithWindowing(), to test processing the
* results of a CoGroupByKey.
*/
private static class ClickOfPurchaseFn extends
- DoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess {
+ OldDoFn<KV<Integer, CoGbkResult>, KV<String, String>> implements RequiresWindowAccess {
private final TupleTag<String> clicksTag;
private final TupleTag<String> purchasesTag;
@@ -347,11 +347,11 @@ public class CoGroupByKeyTest implements Serializable {
/**
- * A DoFn used in testCoGroupByKeyHandleResults(), to test processing the
+ * A OldDoFn used in testCoGroupByKeyHandleResults(), to test processing the
* results of a CoGroupByKey.
*/
private static class CorrelatePurchaseCountForAddressesWithoutNamesFn extends
- DoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
+ OldDoFn<KV<Integer, CoGbkResult>, KV<String, Integer>> {
private final TupleTag<String> purchasesTag;
private final TupleTag<String> addressesTag;
@@ -401,7 +401,7 @@ public class CoGroupByKeyTest implements Serializable {
}
/**
- * Tests that the consuming DoFn
+ * Tests that the consuming OldDoFn
* (CorrelatePurchaseCountForAddressesWithoutNamesFn) performs as expected.
*/
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
index fb2b4d5..ed64f84 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.util.TriggerTester;
import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
import org.apache.beam.sdk.values.TimestampedValue;
+
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Before;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 76bc038..27d2539 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -199,7 +199,7 @@ public class WindowTest implements Serializable {
.apply(GroupByKey.<Integer, String>create())
.apply(
ParDo.of(
- new DoFn<KV<Integer, Iterable<String>>, Void>() {
+ new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
@Override
public void processElement(ProcessContext c) throws Exception {
assertThat(
@@ -231,7 +231,7 @@ public class WindowTest implements Serializable {
.apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
.apply(GroupByKey.<Integer, String>create())
- .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
+ .apply(ParDo.of(new OldDoFn<KV<Integer, Iterable<String>>, Void>() {
@Override
public void processElement(ProcessContext c) throws Exception {
assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index c1e092a..622a277 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -26,9 +26,9 @@ import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -59,7 +59,7 @@ public class WindowingTest implements Serializable {
private static class WindowedCount extends PTransform<PCollection<String>, PCollection<String>> {
private final class FormatCountsDoFn
- extends DoFn<KV<String, Long>, String> implements RequiresWindowAccess {
+ extends OldDoFn<KV<String, Long>, String> implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ":" + c.element().getValue()
@@ -234,8 +234,8 @@ public class WindowingTest implements Serializable {
p.run();
}
- /** A DoFn that tokenizes lines of text into individual words. */
- static class ExtractWordsWithTimestampsFn extends DoFn<String, String> {
+ /** A OldDoFn that tokenizes lines of text into individual words. */
+ static class ExtractWordsWithTimestampsFn extends OldDoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z0-9']+");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
index c808b4d..ee5a2b3 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BucketingFunctionTest.java
@@ -18,10 +18,12 @@
package org.apache.beam.sdk.util;
-import org.apache.beam.sdk.transforms.Combine;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.Combine;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
index 2cbc20e..b95f235 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MovingFunctionTest.java
@@ -18,10 +18,12 @@
package org.apache.beam.sdk.util;
-import org.apache.beam.sdk.transforms.Combine;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.Combine;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
index d9e7593..30406fc 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializableUtilsTest.java
@@ -27,7 +27,6 @@ import com.google.common.collect.ImmutableList;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
index 6c5d0bd..f6bacc4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/SerializerTest.java
@@ -25,7 +25,6 @@ import static org.apache.beam.sdk.util.Structs.addString;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
-
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
index 7e68df9..e87bbee 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/StringUtilsTest.java
@@ -59,12 +59,12 @@ public class StringUtilsTest {
/**
* Inner class for simple name test.
*/
- private class EmbeddedDoFn {
+ private class EmbeddedOldDoFn {
- private class DeeperEmbeddedDoFn extends EmbeddedDoFn {}
+ private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {}
- private EmbeddedDoFn getEmbedded() {
- return new DeeperEmbeddedDoFn();
+ private EmbeddedOldDoFn getEmbedded() {
+ return new DeeperEmbeddedOldDoFn();
}
}
@@ -93,22 +93,22 @@ public class StringUtilsTest {
@Test
public void testSimpleName() {
assertEquals("Embedded",
- StringUtils.approximateSimpleName(EmbeddedDoFn.class));
+ StringUtils.approximateSimpleName(EmbeddedOldDoFn.class));
}
@Test
public void testAnonSimpleName() throws Exception {
thrown.expect(IllegalArgumentException.class);
- EmbeddedDoFn anon = new EmbeddedDoFn(){};
+ EmbeddedOldDoFn anon = new EmbeddedOldDoFn(){};
StringUtils.approximateSimpleName(anon.getClass());
}
@Test
public void testNestedSimpleName() {
- EmbeddedDoFn fn = new EmbeddedDoFn();
- EmbeddedDoFn inner = fn.getEmbedded();
+ EmbeddedOldDoFn fn = new EmbeddedOldDoFn();
+ EmbeddedOldDoFn inner = fn.getEmbedded();
assertEquals("DeeperEmbedded", StringUtils.approximateSimpleName(inner.getClass()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
index b321c8f..4892bbd 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+
import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
index fb002de..79f0cb7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/common/CounterTest.java
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.beam.sdk.util.common.Counter.CommitState;
import org.apache.beam.sdk.util.common.Counter.CounterMean;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
index 9a8ab30..547c778 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PCollectionTupleTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -75,7 +75,7 @@ public final class PCollectionTupleTest implements Serializable {
.apply(Create.of(inputs));
PCollectionTuple outputs = mainInput.apply(ParDo
- .of(new DoFn<Integer, Integer>() {
+ .of(new OldDoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.sideOutput(sideOutputTag, c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
index ba5dffb..c525cf1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypedPValueTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.junit.Rule;
@@ -44,7 +44,7 @@ public class TypedPValueTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
- private static class IdentityDoFn extends DoFn<Integer, Integer> {
+ private static class IdentityDoFn extends OldDoFn<Integer, Integer> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) throws Exception {
@@ -129,7 +129,7 @@ public class TypedPValueTest {
static class EmptyClass {
}
- private static class EmptyClassDoFn extends DoFn<Integer, EmptyClass> {
+ private static class EmptyClassDoFn extends OldDoFn<Integer, EmptyClass> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
index 72abaea..88836f9 100644
--- a/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
+++ b/sdks/java/extensions/join-library/src/main/java/org/apache/beam/sdk/extensions/joinlibrary/Join.java
@@ -20,7 +20,7 @@ package org.apache.beam.sdk.extensions.joinlibrary;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -59,7 +59,7 @@ public class Join {
.apply(CoGroupByKey.<K>create());
return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
@Override
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
@@ -108,7 +108,7 @@ public class Join {
.apply(CoGroupByKey.<K>create());
return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
@Override
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
@@ -161,7 +161,7 @@ public class Join {
.apply(CoGroupByKey.<K>create());
return coGbkResultCollection.apply(ParDo.of(
- new DoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
+ new OldDoFn<KV<K, CoGbkResult>, KV<K, KV<V1, V2>>>() {
@Override
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 76f7079..9fccbf9 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -44,7 +44,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -324,7 +324,7 @@ public class BigQueryIO {
* <p>Each {@link TableRow} contains values indexed by column name. Here is a
* sample processing function that processes a "line" column from rows:
* <pre>{@code
- * static class ExtractWordsFn extends DoFn<TableRow, String> {
+ * static class ExtractWordsFn extends OldDoFn<TableRow, String> {
* public void processElement(ProcessContext c) {
* // Get the "line" field of the TableRow object, split it into words, and emit them.
* TableRow row = c.element();
@@ -696,7 +696,7 @@ public class BigQueryIO {
input.getPipeline()
.apply("Create(CleanupOperation)", Create.of(cleanupOperation))
.apply("Cleanup", ParDo.of(
- new DoFn<CleanupOperation, Void>() {
+ new OldDoFn<CleanupOperation, Void>() {
@Override
public void processElement(ProcessContext c)
throws Exception {
@@ -707,7 +707,7 @@ public class BigQueryIO {
return outputs.get(mainOutput);
}
- private static class IdentityFn<T> extends DoFn<T, T> {
+ private static class IdentityFn<T> extends OldDoFn<T, T> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element());
@@ -1262,7 +1262,7 @@ public class BigQueryIO {
* <p>Here is a sample transform that produces TableRow values containing
* "word" and "count" columns:
* <pre>{@code
- * static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
+ * static class FormatCountsFn extends OldDoFn<KV<String, Long>, TableRow> {
* public void processElement(ProcessContext c) {
* TableRow row = new TableRow()
* .set("word", c.element().getKey())
@@ -2011,11 +2011,11 @@ public class BigQueryIO {
/////////////////////////////////////////////////////////////////////////////
/**
- * Implementation of DoFn to perform streaming BigQuery write.
+ * Implementation of OldDoFn to perform streaming BigQuery write.
*/
@SystemDoFnInternal
private static class StreamingWriteFn
- extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
+ extends OldDoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
/** TableSchema in JSON. Use String to make the class Serializable. */
private final String jsonTableSchema;
@@ -2248,8 +2248,8 @@ public class BigQueryIO {
* id is created by concatenating this randomUUID with a sequential number.
*/
private static class TagWithUniqueIdsAndTable
- extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>>
- implements DoFn.RequiresWindowAccess {
+ extends OldDoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>>
+ implements OldDoFn.RequiresWindowAccess {
/** TableSpec to write to. */
private final String tableSpec;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index f4082d4..1f77e3e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.io.range.ByteKeyRangeTracker;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -512,7 +512,7 @@ public class BigtableIO {
return new BigtableServiceImpl(options);
}
- private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> {
+ private class BigtableWriterFn extends OldDoFn<KV<ByteString, Iterable<Mutation>>, Void> {
public BigtableWriterFn(String tableId, BigtableService bigtableService) {
this.tableId = checkNotNull(tableId, "tableId");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
index bda907a..6f3663a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
@@ -37,9 +37,9 @@ import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
@@ -85,7 +85,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
-
import javax.annotation.Nullable;
/**
@@ -479,11 +478,11 @@ public class V1Beta3 {
}
/**
- * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique keys
- * and outputs them as {@link KV}.
+ * A {@link OldDoFn} that splits a given query into multiple sub-queries, assigns them unique
+ * keys and outputs them as {@link KV}.
*/
@VisibleForTesting
- static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
+ static class SplitQueryFn extends OldDoFn<Query, KV<Integer, Query>> {
private final V1Beta3Options options;
// number of splits to make for a given query
private final int numSplits;
@@ -560,10 +559,10 @@ public class V1Beta3 {
}
/**
- * A {@link DoFn} that reads entities from Datastore for each query.
+ * A {@link OldDoFn} that reads entities from Datastore for each query.
*/
@VisibleForTesting
- static class ReadFn extends DoFn<Query, Entity> {
+ static class ReadFn extends OldDoFn<Query, Entity> {
private final V1Beta3Options options;
private final V1Beta3DatastoreFactory datastoreFactory;
// Datastore client
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 00e7891..7d2df62 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -58,7 +58,7 @@ import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.SourceTestUtils.ExpectedSplitOutcome;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -216,7 +216,7 @@ public class BigQueryIOTest implements Serializable {
private Object[] pollJobReturns;
private String executingProject;
// Both counts will be reset back to zeros after serialization.
- // This is a work around for DoFn's verifyUnmodified check.
+ // This is a work around for OldDoFn's verifyUnmodified check.
private transient int startJobCallsCount;
private transient int pollJobStatusCallsCount;
@@ -546,7 +546,7 @@ public class BigQueryIOTest implements Serializable {
.apply(BigQueryIO.Read.from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation())
- .apply(ParDo.of(new DoFn<TableRow, String>() {
+ .apply(ParDo.of(new OldDoFn<TableRow, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output((String) c.element().get("name"));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
index a39d7d5..83489a5 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableWriteIT.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -108,7 +108,7 @@ public class BigtableWriteIT implements Serializable {
Pipeline p = Pipeline.create(options);
p.apply(CountingInput.upTo(numRows))
- .apply(ParDo.of(new DoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
+ .apply(ParDo.of(new OldDoFn<Long, KV<ByteString, Iterable<Mutation>>>() {
@Override
public void processElement(ProcessContext c) {
int index = c.element().intValue();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
index 59d91d4..daed1cb 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
@@ -27,7 +27,7 @@ import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.sdk.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
@@ -109,9 +109,9 @@ class V1Beta3TestUtil {
}
/**
- * A DoFn that creates entity for a long number.
+ * A OldDoFn that creates entity for a long number.
*/
- static class CreateEntityFn extends DoFn<Long, Entity> {
+ static class CreateEntityFn extends OldDoFn<Long, Entity> {
private final String kind;
@Nullable
private final String namespace;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
index 2de933c..342c4fc 100644
--- a/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
+++ b/sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -453,7 +453,7 @@ public class JmsIO {
checkArgument((queue != null || topic != null), "Either queue or topic is required");
}
- private static class JmsWriter extends DoFn<String, Void> {
+ private static class JmsWriter extends OldDoFn<String, Void> {
private ConnectionFactory connectionFactory;
private String queue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 3b64bd5..eb649a6 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -33,7 +33,7 @@ import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -550,7 +550,7 @@ public class KafkaIO {
return typedRead
.apply(begin)
.apply("Remove Kafka Metadata",
- ParDo.of(new DoFn<KafkaRecord<K, V>, KV<K, V>>() {
+ ParDo.of(new OldDoFn<KafkaRecord<K, V>, KV<K, V>>() {
@Override
public void processElement(ProcessContext ctx) {
ctx.output(ctx.element().getKV());
@@ -1315,7 +1315,7 @@ public class KafkaIO {
public PDone apply(PCollection<V> input) {
return input
.apply("Kafka values with default key",
- ParDo.of(new DoFn<V, KV<Void, V>>() {
+ ParDo.of(new OldDoFn<V, KV<Void, V>>() {
@Override
public void processElement(ProcessContext ctx) throws Exception {
ctx.output(KV.<Void, V>of(null, ctx.element()));
@@ -1326,7 +1326,7 @@ public class KafkaIO {
}
}
- private static class KafkaWriter<K, V> extends DoFn<KV<K, V>, Void> {
+ private static class KafkaWriter<K, V> extends OldDoFn<KV<K, V>, Void> {
@Override
public void startBundle(Context c) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index dd93823..d7b1921 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -33,10 +33,10 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -78,7 +78,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.annotation.Nullable;
/**
@@ -281,7 +280,7 @@ public class KafkaIOTest {
p.run();
}
- private static class ElementValueDiff extends DoFn<Long, Long> {
+ private static class ElementValueDiff extends OldDoFn<Long, Long> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
@@ -309,7 +308,7 @@ public class KafkaIOTest {
p.run();
}
- private static class RemoveKafkaMetadata<K, V> extends DoFn<KafkaRecord<K, V>, KV<K, V>> {
+ private static class RemoveKafkaMetadata<K, V> extends OldDoFn<KafkaRecord<K, V>, KV<K, V>> {
@Override
public void processElement(ProcessContext ctx) throws Exception {
ctx.output(ctx.element().getKV());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
index fef8d40..1141e88 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/WithTimestampsJava8Test.java
@@ -47,9 +47,9 @@ public class WithTimestampsJava8Test implements Serializable {
.apply(WithTimestamps.of((String input) -> new Instant(Long.valueOf(yearTwoThousand))));
PCollection<KV<String, Instant>> timestampedVals =
- timestamped.apply(ParDo.of(new DoFn<String, KV<String, Instant>>() {
+ timestamped.apply(ParDo.of(new OldDoFn<String, KV<String, Instant>>() {
@Override
- public void processElement(DoFn<String, KV<String, Instant>>.ProcessContext c)
+ public void processElement(OldDoFn<String, KV<String, Instant>>.ProcessContext c)
throws Exception {
c.output(KV.of(c.element(), c.timestamp()));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index c0e5b17..bc55c06 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
@@ -108,7 +108,7 @@ import java.util.regex.Pattern;
*/
public class DebuggingWordCount {
/** A DoFn that filters for a specific key based upon a regular expression. */
- public static class FilterTextFn extends DoFn<KV<String, Long>, KV<String, Long>> {
+ public static class FilterTextFn extends OldDoFn<KV<String, Long>, KV<String, Long>> {
/**
* Concept #1: The logger below uses the fully qualified class name of FilterTextFn
* as the logger. All log statements emitted by this logger will be referenced by this name
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
index be32afa..55beb1f 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
@@ -82,7 +82,7 @@ public class MinimalWordCount {
// DoFn (defined in-line) on each element that tokenizes the text line into individual words.
// The ParDo returns a PCollection<String>, where each element is an individual word in
// Shakespeare's collected texts.
- .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
+ .apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split("[^a-zA-Z']+")) {
@@ -98,7 +98,7 @@ public class MinimalWordCount {
.apply(Count.<String>perElement())
// Apply another ParDo transform that formats our PCollection of word counts into a printable
// string, suitable for writing to an output file.
- .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() {
+ .apply("FormatResults", ParDo.of(new OldDoFn<KV<String, Long>, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
index c2defa7..ffe8b88 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WindowedWordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
@@ -121,7 +121,7 @@ public class WindowedWordCount {
* his masterworks. Each line of the corpus will get a random associated timestamp somewhere in a
* 2-hour period.
*/
- static class AddTimestampFn extends DoFn<String, String> {
+ static class AddTimestampFn extends OldDoFn<String, String> {
private static final long RAND_RANGE = 7200000; // 2 hours in ms
@Override
@@ -137,7 +137,7 @@ public class WindowedWordCount {
}
/** A DoFn that converts a Word and Count into a BigQuery table row. */
- static class FormatAsTableRowFn extends DoFn<KV<String, Long>, TableRow> {
+ static class FormatAsTableRowFn extends OldDoFn<KV<String, Long>, TableRow> {
@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 803e800..5432036 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -95,7 +95,7 @@ public class WordCount {
* of-line. This DoFn tokenizes lines of text into individual words; we pass it to a ParDo in the
* pipeline.
*/
- static class ExtractWordsFn extends DoFn<String, String> {
+ static class ExtractWordsFn extends OldDoFn<String, String> {
private final Aggregator<Long, Long> emptyLines =
createAggregator("emptyLines", new Sum.SumLongFn());
@@ -118,7 +118,7 @@ public class WordCount {
}
/** A DoFn that converts a Word and Count into a printable string. */
- public static class FormatAsTextFn extends DoFn<KV<String, Long>, String> {
+ public static class FormatAsTextFn extends OldDoFn<KV<String, Long>, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
index 5c182b2..9b347da 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/PubsubFileInjector.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.IntraBundleParallelization;
import org.apache.beam.sdk.util.Transport;
@@ -72,7 +72,7 @@ public class PubsubFileInjector {
}
/** A DoFn that publishes non-empty lines to Google Cloud PubSub. */
- public static class Bound extends DoFn<String, Void> {
+ public static class Bound extends OldDoFn<String, Void> {
private final String outputTopic;
private final String timestampLabelKey;
public transient Pubsub pubsub;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
index 9a75bb7..6a1c41b 100644
--- a/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/main/resources/archetype-resources/src/main/java/StarterPipeline.java
@@ -20,7 +20,7 @@ package ${package};
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
@@ -50,13 +50,13 @@ public class StarterPipeline {
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(Create.of("Hello", "World"))
- .apply(ParDo.of(new DoFn<String, String>() {
+ .apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
}))
- .apply(ParDo.of(new DoFn<String, Void>() {
+ .apply(ParDo.of(new OldDoFn<String, Void>() {
@Override
public void processElement(ProcessContext c) {
LOG.info(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
index 8c71d9d..7c13350 100644
--- a/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
+++ b/sdks/java/maven-archetypes/starter/src/test/resources/projects/basic/reference/src/main/java/it/pkg/StarterPipeline.java
@@ -20,7 +20,7 @@ package it.pkg;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
@@ -50,13 +50,13 @@ public class StarterPipeline {
PipelineOptionsFactory.fromArgs(args).withValidation().create());
p.apply(Create.of("Hello", "World"))
- .apply(ParDo.of(new DoFn<String, String>() {
+ .apply(ParDo.of(new OldDoFn<String, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
}))
- .apply(ParDo.of(new DoFn<String, Void>() {
+ .apply(ParDo.of(new OldDoFn<String, Void>() {
@Override
public void processElement(ProcessContext c) {
LOG.info(c.element());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
----------------------------------------------------------------------
diff --git a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
index f1dfbb9..0da75f4 100644
--- a/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
+++ b/sdks/java/microbenchmarks/src/main/java/org/apache/beam/sdk/microbenchmarks/transforms/DoFnReflectorBenchmark.java
@@ -20,11 +20,11 @@ package org.apache.beam.sdk.microbenchmarks.transforms;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnReflector;
import org.apache.beam.sdk.transforms.DoFnReflector.DoFnInvoker;
import org.apache.beam.sdk.transforms.DoFnWithContext;
import org.apache.beam.sdk.transforms.DoFnWithContext.ExtraContextFactory;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -40,7 +40,7 @@ import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
/**
- * Benchmarks for {@link DoFn} and {@link DoFnWithContext} invocations, specifically
+ * Benchmarks for {@link OldDoFn} and {@link DoFnWithContext} invocations, specifically
* for measuring the overhead of {@link DoFnReflector}.
*/
@State(Scope.Benchmark)
@@ -50,7 +50,7 @@ public class DoFnReflectorBenchmark {
private static final String ELEMENT = "some string to use for testing";
- private DoFn<String, String> doFn = new UpperCaseDoFn();
+ private OldDoFn<String, String> doFn = new UpperCaseDoFn();
private DoFnWithContext<String, String> doFnWithContext = new UpperCaseDoFnWithContext();
private StubDoFnProcessContext stubDoFnContext = new StubDoFnProcessContext(doFn, ELEMENT);
@@ -71,7 +71,7 @@ public class DoFnReflectorBenchmark {
};
private DoFnReflector doFnReflector;
- private DoFn<String, String> adaptedDoFnWithContext;
+ private OldDoFn<String, String> adaptedDoFnWithContext;
private DoFnInvoker<String, String> invoker;
@@ -100,7 +100,7 @@ public class DoFnReflectorBenchmark {
return stubDoFnWithContextContext.output;
}
- private static class UpperCaseDoFn extends DoFn<String, String> {
+ private static class UpperCaseDoFn extends OldDoFn<String, String> {
@Override
public void processElement(ProcessContext c) throws Exception {
@@ -116,12 +116,12 @@ public class DoFnReflectorBenchmark {
}
}
- private static class StubDoFnProcessContext extends DoFn<String, String>.ProcessContext {
+ private static class StubDoFnProcessContext extends OldDoFn<String, String>.ProcessContext {
private final String element;
private String output;
- public StubDoFnProcessContext(DoFn<String, String> fn, String element) {
+ public StubDoFnProcessContext(OldDoFn<String, String> fn, String element) {
fn.super();
this.element = element;
}