You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/04 01:26:15 UTC
[07/19] incubator-beam git commit: Rename DoFn to OldDoFn
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
index 77c857c..7917aec 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java
@@ -23,8 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.util.VarInt;
import com.google.common.base.MoreObjects;
@@ -38,8 +38,8 @@ import java.util.Objects;
/**
* Provides information about the pane an element belongs to. Every pane is implicitly associated
* with a window. Panes are observable only via the
- * {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext#pane} method of the context
- * passed to a {@link DoFn#processElement} overridden method.
+ * {@link OldDoFn.ProcessContext#pane} method of the context
+ * passed to a {@link OldDoFn#processElement} overridden method.
*
* <p>Note: This does not uniquely identify a pane, and should not be used for comparisons.
*/
@@ -74,8 +74,8 @@ public final class PaneInfo {
* definitions:
* <ol>
* <li>We'll call a pipeline 'simple' if it does not use
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#outputWithTimestamp} in
- * any {@code DoFn}, and it uses the same
+ * {@link OldDoFn.Context#outputWithTimestamp} in
+ * any {@code OldDoFn}, and it uses the same
* {@link org.apache.beam.sdk.transforms.windowing.Window.Bound#withAllowedLateness}
* argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}).
* <li>We'll call an element 'locally late', from the point of view of a computation on a
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index fe8b66f..03ff481 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -21,8 +21,8 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -645,7 +645,7 @@ public class Window {
// We first apply a (trivial) transform to the input PCollection to produce a new
// PCollection. This ensures that we don't modify the windowing strategy of the input
// which may be used elsewhere.
- .apply("Identity", ParDo.of(new DoFn<T, T>() {
+ .apply("Identity", ParDo.of(new OldDoFn<T, T>() {
@Override public void processElement(ProcessContext c) {
c.output(c.element());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
index a62444f..dd36367 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BaseExecutionContext.java
@@ -107,7 +107,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
+ * {@link OldDoFn.Context#output}
* is called.
*/
@Override
@@ -115,7 +115,7 @@ public abstract class BaseExecutionContext<T extends ExecutionContext.StepContex
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+ * {@link OldDoFn.Context#sideOutput}
* is called.
*/
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
index ce35c24..e14aec8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/BucketingFunction.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.sdk.transforms.Combine;
+
import java.util.HashMap;
import java.util.Map;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index f73fae3..149d276 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineWithContext.Context;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.state.StateContext;
import org.apache.beam.sdk.values.PCollectionView;
@@ -49,9 +49,9 @@ public class CombineContextFactory {
}
/**
- * Returns a {@code Combine.Context} that wraps a {@code DoFn.ProcessContext}.
+ * Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}.
*/
- public static Context createFromProcessContext(final DoFn<?, ?>.ProcessContext c) {
+ public static Context createFromProcessContext(final OldDoFn<?, ?>.ProcessContext c) {
return new Context() {
@Override
public PipelineOptions getPipelineOptions() {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
index 01bde82..1c2f554 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutionContext.java
@@ -42,14 +42,14 @@ public interface ExecutionContext {
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
+ * {@link OldDoFn.Context#output}
* is called.
*/
void noteOutput(WindowedValue<?> output);
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+ * {@link OldDoFn.Context#sideOutput}
* is called.
*/
void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
@@ -71,14 +71,14 @@ public interface ExecutionContext {
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#output}
+ * {@link OldDoFn.Context#output}
* is called.
*/
void noteOutput(WindowedValue<?> output);
/**
* Hook for subclasses to implement that will be called whenever
- * {@link org.apache.beam.sdk.transforms.DoFn.Context#sideOutput}
+ * {@link OldDoFn.Context#sideOutput}
* is called.
*/
void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
index 96802ae..eb0a91a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MovingFunction.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.transforms.Combine;
+
import java.util.Arrays;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java
index 9dc4f68..ae3d391 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunner.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import java.io.Serializable;
@@ -43,62 +43,62 @@ public interface PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Seria
/////////////////////////////////////////////////////////////////////////////
/**
- * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link DoFn}.
+ * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}.
*
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
* if it is required.
*/
- public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c);
+ public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c);
/**
- * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link DoFn}.
+ * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}.
*
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
* if it is required.
*/
- public AccumT addInput(K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c);
+ public AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c);
/**
- * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link DoFn}.
+ * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}.
*
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
* if it is required.
*/
public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c);
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c);
/**
- * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link DoFn}.
+ * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}.
*
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
* if it is required.
*/
- public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c);
+ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
/**
- * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link DoFn}.
+ * Forwards the call to a {@link PerKeyCombineFn} to compact the accumulator in a {@link OldDoFn}.
*
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
* if it is required.
*/
- public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c);
+ public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c);
/**
* Forwards the call to a {@link PerKeyCombineFn} to combine the inputs and extract output
- * in a {@link DoFn}.
+ * in a {@link OldDoFn}.
*
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
* if it is required.
*/
- public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c);
+ public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c);
/**
- * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link DoFn}.
+ * Forwards the call to a {@link PerKeyCombineFn} to add all inputs in a {@link OldDoFn}.
*
- * <p>It constructs a {@code CombineWithContext.Context} from {@code DoFn.ProcessContext}
+ * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext}
* if it is required.
*/
- public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c);
+ public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c);
/////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
index 2d28682..87870a8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PerKeyCombineFnRunners.java
@@ -23,7 +23,7 @@ import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import com.google.common.collect.Iterables;
@@ -69,39 +69,39 @@ public class PerKeyCombineFnRunners {
}
@Override
- public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) {
+ public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFn.createAccumulator(key);
}
@Override
public AccumT addInput(
- K key, AccumT accumulator, InputT input, DoFn<?, ?>.ProcessContext c) {
+ K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFn.addInput(key, accumulator, input);
}
@Override
public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) {
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFn.mergeAccumulators(key, accumulators);
}
@Override
- public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
+ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFn.extractOutput(key, accumulator);
}
@Override
- public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
+ public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFn.compact(key, accumulator);
}
@Override
- public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) {
+ public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFn.apply(key, inputs);
}
@Override
- public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) {
+ public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
AccumT accum = keyedCombineFn.createAccumulator(key);
for (InputT input : inputs) {
accum = keyedCombineFn.addInput(key, accum, input);
@@ -165,45 +165,45 @@ public class PerKeyCombineFnRunners {
}
@Override
- public AccumT createAccumulator(K key, DoFn<?, ?>.ProcessContext c) {
+ public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFnWithContext.createAccumulator(key,
CombineContextFactory.createFromProcessContext(c));
}
@Override
public AccumT addInput(
- K key, AccumT accumulator, InputT value, DoFn<?, ?>.ProcessContext c) {
+ K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFnWithContext.addInput(key, accumulator, value,
CombineContextFactory.createFromProcessContext(c));
}
@Override
public AccumT mergeAccumulators(
- K key, Iterable<AccumT> accumulators, DoFn<?, ?>.ProcessContext c) {
+ K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFnWithContext.mergeAccumulators(
key, accumulators, CombineContextFactory.createFromProcessContext(c));
}
@Override
- public OutputT extractOutput(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
+ public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFnWithContext.extractOutput(key, accumulator,
CombineContextFactory.createFromProcessContext(c));
}
@Override
- public AccumT compact(K key, AccumT accumulator, DoFn<?, ?>.ProcessContext c) {
+ public AccumT compact(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFnWithContext.compact(key, accumulator,
CombineContextFactory.createFromProcessContext(c));
}
@Override
- public OutputT apply(K key, Iterable<? extends InputT> inputs, DoFn<?, ?>.ProcessContext c) {
+ public OutputT apply(K key, Iterable<? extends InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
return keyedCombineFnWithContext.apply(key, inputs,
CombineContextFactory.createFromProcessContext(c));
}
@Override
- public AccumT addInputs(K key, Iterable<InputT> inputs, DoFn<?, ?>.ProcessContext c) {
+ public AccumT addInputs(K key, Iterable<InputT> inputs, OldDoFn<?, ?>.ProcessContext c) {
CombineWithContext.Context combineContext = CombineContextFactory.createFromProcessContext(c);
AccumT accum = keyedCombineFnWithContext.createAccumulator(key, combineContext);
for (InputT input : inputs) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
index 36c4a9f..9e6c7d2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubClient.java
@@ -34,6 +34,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
+
import javax.annotation.Nullable;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
index 9fa0380..88ae6cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PubsubTestClient.java
@@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
import javax.annotation.Nullable;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
index c2273f5..2808ca9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
@@ -17,11 +17,11 @@
*/
package org.apache.beam.sdk.util;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.values.KV;
/**
- * DoFn that makes timestamps and window assignments explicit in the value part of each key/value
+ * OldDoFn that makes timestamps and window assignments explicit in the value part of each key/value
* pair.
*
* @param <K> the type of the keys of the input and output {@code PCollection}s
@@ -29,7 +29,7 @@ import org.apache.beam.sdk.values.KV;
*/
@SystemDoFnInternal
public class ReifyTimestampAndWindowsDoFn<K, V>
- extends DoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+ extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
@Override
public void processElement(ProcessContext c)
throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 6c58689..66c7cc0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.sdk.util;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -70,7 +70,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// set allowed lateness.
.setWindowingStrategyInternal(originalStrategy)
.apply("ExpandIterable", ParDo.of(
- new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
+ new OldDoFn<KV<K, Iterable<V>>, KV<K, V>>() {
@Override
public void processElement(ProcessContext c) {
K key = c.element().getKey();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
index 45f6c4a..1e70aaf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SerializableUtils.java
@@ -105,7 +105,7 @@ public class SerializableUtils {
*/
public static CloudObject ensureSerializable(Coder<?> coder) {
// Make sure that Coders are java serializable as well since
- // they are regularly captured within DoFn's.
+ // they are regularly captured within OldDoFn's.
Coder<?> copy = (Coder<?>) ensureSerializable((Serializable) coder);
CloudObject cloudObject = copy.asCloudObject();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
index 53201a4..bb59373 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/StringUtils.java
@@ -98,7 +98,7 @@ public class StringUtils {
}
private static final String[] STANDARD_NAME_SUFFIXES =
- new String[]{"DoFn", "Fn"};
+ new String[]{"OldDoFn", "Fn"};
/**
* Pattern to match a non-anonymous inner class.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
index 9a42b23..b8a5cd4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/SystemDoFnInternal.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.sdk.util;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@@ -26,10 +26,10 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
- * Annotation to mark {@link DoFn DoFns} as an internal component of the Dataflow SDK.
+ * Annotation to mark {@link OldDoFn DoFns} as an internal component of the Dataflow SDK.
*
* <p>Currently, the only effect of this is to mark any aggregators reported by an annotated
- * {@code DoFn} as a system counter (as opposed to a user counter).
+ * {@code OldDoFn} as a system counter (as opposed to a user counter).
*
* <p>This is internal to the Dataflow SDK.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index c03ab4d..3212d64 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -33,7 +33,6 @@ import com.google.common.base.MoreObjects;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.joda.time.Instant;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
index e724349..f0e4812 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java
@@ -22,7 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StandardCoder;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import com.google.common.base.MoreObjects;
@@ -139,8 +139,8 @@ public class ValueWithRecordId<ValueT> {
ByteArrayCoder idCoder;
}
- /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
- public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> {
+ /** {@link OldDoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */
+ public static class StripIdsDoFn<T> extends OldDoFn<ValueWithRecordId<T>, T> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getValue());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
index 676848c..9d341a1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowedValue.java
@@ -38,7 +38,6 @@ import com.google.common.collect.ImmutableList;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.joda.time.Instant;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
index 149c497..3a1b654 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingInternals.java
@@ -30,8 +30,8 @@ import java.io.IOException;
import java.util.Collection;
/**
- * Interface that may be required by some (internal) {@code DoFn}s to implement windowing. It should
- * not be necessary for general user code to interact with this at all.
+ * Interface that may be required by some (internal) {@code OldDoFn}s to implement windowing. It
+ * should not be necessary for general user code to interact with this at all.
*
* <p>This interface should be provided by runner implementors to support windowing on their runner.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
index 75b8ad8..6db532e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ReflectHelpers.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.util.common;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+
import static java.util.Arrays.asList;
import com.google.common.base.Function;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
index b60a53e..69bf77d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java
@@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.PropertyNames;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.joda.time.Instant;
import java.io.IOException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 8abfb05..5137031 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,8 +36,8 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.UserCodeException;
@@ -146,9 +146,9 @@ public class PipelineTest {
private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix(
final String suffix) {
- return ParDo.of(new DoFn<String, String>() {
+ return ParDo.of(new OldDoFn<String, String>() {
@Override
- public void processElement(DoFn<String, String>.ProcessContext c) {
+ public void processElement(OldDoFn<String, String>.ProcessContext c) {
c.output(c.element() + suffix);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
index 54f7ec1..41d0932 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/AvroCoderTest.java
@@ -31,7 +31,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.SerializableUtils;
@@ -134,7 +134,7 @@ public class AvroCoderTest {
}
}
- private static class GetTextFn extends DoFn<Pojo, String> {
+ private static class GetTextFn extends OldDoFn<Pojo, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().text);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
index 817ea20..35ec6c6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/CoderRegistryTest.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
@@ -366,7 +366,7 @@ public class CoderRegistryTest {
private static class PTransformOutputingMySerializableGeneric
extends PTransform<PCollection<String>, PCollection<KV<String, MySerializableGeneric<String>>>> {
- private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<String>>> {
+ private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<String>>> {
@Override
public void processElement(ProcessContext c) { }
}
@@ -430,7 +430,7 @@ public class CoderRegistryTest {
PCollection<String>,
PCollection<KV<String, MySerializableGeneric<T>>>> {
- private class OutputDoFn extends DoFn<String, KV<String, MySerializableGeneric<T>>> {
+ private class OutputDoFn extends OldDoFn<String, KV<String, MySerializableGeneric<T>>> {
@Override
public void processElement(ProcessContext c) { }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
index d6423e5..3e7fd50 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/SerializableCoderTest.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.CloudObject;
import org.apache.beam.sdk.util.CoderUtils;
@@ -82,14 +82,14 @@ public class SerializableCoderTest implements Serializable {
}
}
- static class StringToRecord extends DoFn<String, MyRecord> {
+ static class StringToRecord extends OldDoFn<String, MyRecord> {
@Override
public void processElement(ProcessContext c) {
c.output(new MyRecord(c.element()));
}
}
- static class RecordToString extends DoFn<MyRecord, String> {
+ static class RecordToString extends OldDoFn<MyRecord, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().value);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
index c7153f8..09405ab 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroSourceTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
index cabfc21..fe9415b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSourceTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
index 8fbed94..01e5fe5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CompressedSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.not;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
index c5f7478..95f7454 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingInputTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -28,9 +29,9 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -119,7 +120,7 @@ public class CountingInputTest {
assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
}
- private static class ElementValueDiff extends DoFn<Long, Long> {
+ private static class ElementValueDiff extends OldDoFn<Long, Long> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
index 321f066..45f636f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/CountingSourceTest.java
@@ -34,10 +34,10 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.RemoveDuplicates;
import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -159,7 +159,7 @@ public class CountingSourceTest {
p.run();
}
- private static class ElementValueDiff extends DoFn<Long, Long> {
+ private static class ElementValueDiff extends OldDoFn<Long, Long> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element() - c.timestamp().getMillis());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
index 7009023..f689f51 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/OffsetBasedSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
index 9c75972..f8592c9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubUnboundedSinkTest.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.PubsubClient;
import org.apache.beam.sdk.util.PubsubClient.OutgoingMessage;
@@ -58,7 +58,7 @@ public class PubsubUnboundedSinkTest {
private static final String ID_LABEL = "id";
private static final int NUM_SHARDS = 10;
- private static class Stamp extends DoFn<String, String> {
+ private static class Stamp extends OldDoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
c.outputWithTimestamp(c.element(), new Instant(TIMESTAMP));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
index 237c025..a47ddf2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
@@ -37,6 +37,7 @@ import org.junit.runners.JUnit4;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+
import javax.annotation.Nullable;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index a1f1f70..6ec3a71 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -23,6 +23,7 @@ import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
+
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.startsWith;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 0af0744..4b6e749 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFrom;
+
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -40,9 +41,9 @@ import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOption
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -101,14 +102,14 @@ public class WriteTest {
this.window = window;
}
- private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
+ private static class AddArbitraryKey<T> extends OldDoFn<T, KV<Integer, T>> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}
- private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
+ private static class RemoveArbitraryKey<T> extends OldDoFn<KV<Integer, Iterable<T>>, T> {
@Override
public void processElement(ProcessContext c) throws Exception {
for (T s : c.element().getValue()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 98aee4e..ea0db73 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -46,6 +46,7 @@ import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlType;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
index 22359dc..ec2902e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GcpOptionsTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.options;
import static com.google.common.base.Strings.isNullOrEmpty;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
index 546fe7d..8e1439b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/GoogleApiDebugOptionsTest.java
@@ -29,7 +29,6 @@ import com.google.api.services.bigquery.Bigquery.Datasets.Delete;
import com.google.api.services.storage.Storage;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
index 8b8337e..0c1b596 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsFactoryTest.java
@@ -43,7 +43,6 @@ import com.google.common.collect.ListMultimap;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
index 687271c..b2efa61 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/PipelineOptionsTest.java
@@ -28,7 +28,6 @@ import com.google.common.collect.Sets;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
index 110f30a..c4c5c1c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/options/ProxyInvocationHandlerTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasNamespace;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasType;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasItem;
@@ -43,7 +44,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
index 74cc5e0..13476e2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/AggregatorPipelineExtractorTest.java
@@ -27,9 +27,9 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
@@ -211,7 +211,7 @@ public class AggregatorPipelineExtractorTest {
}
}
- private static class AggregatorProvidingDoFn<InT, OuT> extends DoFn<InT, OuT> {
+ private static class AggregatorProvidingDoFn<InT, OuT> extends OldDoFn<InT, OuT> {
public <InputT, OutT> Aggregator<InputT, OutT> addAggregator(
CombineFn<InputT, ?, OutT> combiner) {
return createAggregator(randomName(), combiner);
@@ -222,7 +222,7 @@ public class AggregatorPipelineExtractorTest {
}
@Override
- public void processElement(DoFn<InT, OuT>.ProcessContext c) throws Exception {
+ public void processElement(OldDoFn<InT, OuT>.ProcessContext c) throws Exception {
fail();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
index 1070dab..acc2b48 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PAssertTest.java
@@ -39,7 +39,6 @@ import org.apache.beam.sdk.values.TimestampedValue;
import com.google.common.collect.Iterables;
import com.fasterxml.jackson.annotation.JsonCreator;
-
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 043c06c..0bd7893 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.Rule;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
index 8c2451b..fc10d4b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateQuantilesTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.TestUtils.checkCombineFn;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
index 1a42947..5c8732f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ApproximateUniqueTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.transforms;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -53,7 +54,7 @@ import java.util.List;
*/
@RunWith(JUnit4.class)
public class ApproximateUniqueTest implements Serializable {
- // implements Serializable just to make it easy to use anonymous inner DoFn subclasses
+ // implements Serializable just to make it easy to use anonymous inner OldDoFn subclasses
@Test
public void testEstimationErrorToSampleSize() {
@@ -222,7 +223,7 @@ public class ApproximateUniqueTest implements Serializable {
.apply(View.<Long>asSingleton());
PCollection<KV<Long, Long>> approximateAndExact = approximate
- .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
+ .apply(ParDo.of(new OldDoFn<Long, KV<Long, Long>>() {
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(c.element(), c.sideInput(exact)));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
index 486c738..d6bf826 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java
@@ -461,7 +461,7 @@ public class CombineFnsTest {
}
private static class ExtractResultDoFn
- extends DoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>>{
+ extends OldDoFn<KV<String, CoCombineResult>, KV<String, KV<Integer, String>>> {
private final TupleTag<Integer> maxIntTag;
private final TupleTag<UserString> concatStringTag;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
index b453089..cb9928e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -73,7 +74,6 @@ import com.google.common.collect.Sets;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Test;
@@ -117,7 +117,7 @@ public class CombineTest implements Serializable {
1, 1, 2, 3, 5, 8, 13, 21, 34, 55
};
- @Mock private DoFn<?, ?>.ProcessContext processContext;
+ @Mock private OldDoFn<?, ?>.ProcessContext processContext;
PCollection<KV<String, Integer>> createInput(Pipeline p,
KV<String, Integer>[] table) {
@@ -372,7 +372,7 @@ public class CombineTest implements Serializable {
pipeline.run();
}
- private static class FormatPaneInfo extends DoFn<Integer, String> {
+ private static class FormatPaneInfo extends OldDoFn<Integer, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + ": " + c.pane().isLast());
@@ -560,7 +560,7 @@ public class CombineTest implements Serializable {
pipeline.run();
}
- private static class GetLast extends DoFn<Integer, Integer> {
+ private static class GetLast extends OldDoFn<Integer, Integer> {
@Override
public void processElement(ProcessContext c) {
if (c.pane().isLast()) {
@@ -653,7 +653,7 @@ public class CombineTest implements Serializable {
PCollection<Integer> output = pipeline
.apply("CreateVoidMainInput", Create.of((Void) null))
- .apply("OutputSideInput", ParDo.of(new DoFn<Void, Integer>() {
+ .apply("OutputSideInput", ParDo.of(new OldDoFn<Void, Integer>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(view));
@@ -1176,7 +1176,7 @@ public class CombineTest implements Serializable {
}
private static <T> PCollection<T> copy(PCollection<T> pc, final int n) {
- return pc.apply(ParDo.of(new DoFn<T, T>() {
+ return pc.apply(ParDo.of(new OldDoFn<T, T>() {
@Override
public void processElement(ProcessContext c) throws Exception {
for (int i = 0; i < n; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
index 07ba002..cf65423 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CreateTest.java
@@ -229,7 +229,7 @@ public class CreateTest {
p.run();
}
- private static class PrintTimestamps extends DoFn<String, String> {
+ private static class PrintTimestamps extends OldDoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element() + ":" + c.timestamp().getMillis());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java
deleted file mode 100644
index 2e588c7..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnContextTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Tests for {@link DoFn.Context}.
- */
-@RunWith(JUnit4.class)
-public class DoFnContextTest {
-
- @Mock
- private Aggregator<Long, Long> agg;
-
- private DoFn<Object, Object> fn;
- private DoFn<Object, Object>.Context context;
-
- @Before
- public void setup() {
- MockitoAnnotations.initMocks(this);
-
- // Need to be real objects to call the constructor, and to reference the
- // outer instance of DoFn
- NoOpDoFn<Object, Object> noOpFn = new NoOpDoFn<>();
- DoFn<Object, Object>.Context noOpContext = noOpFn.context();
-
- fn = spy(noOpFn);
- context = spy(noOpContext);
- }
-
- @Test
- public void testSetupDelegateAggregatorsCreatesAndLinksDelegateAggregators() {
- Sum.SumLongFn combiner = new Sum.SumLongFn();
- Aggregator<Long, Long> delegateAggregator =
- fn.createAggregator("test", combiner);
-
- when(context.createAggregatorInternal("test", combiner)).thenReturn(agg);
-
- context.setupDelegateAggregators();
- delegateAggregator.addValue(1L);
-
- verify(agg).addValue(1L);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
index bf9899c..2488042 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java
@@ -24,7 +24,7 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
import org.junit.Before;
import org.junit.Rule;
@@ -36,7 +36,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/**
- * Tests for DoFn.DelegatingAggregator.
+ * Tests for OldDoFn.DelegatingAggregator.
*/
@RunWith(JUnit4.class)
public class DoFnDelegatingAggregatorTest {
@@ -54,7 +54,7 @@ public class DoFnDelegatingAggregatorTest {
@Test
public void testAddValueWithoutDelegateThrowsException() {
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
String name = "agg";
CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
@@ -64,7 +64,7 @@ public class DoFnDelegatingAggregatorTest {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("cannot be called");
- thrown.expectMessage("DoFn");
+ thrown.expectMessage("OldDoFn");
aggregator.addValue(21.2);
}
@@ -74,7 +74,7 @@ public class DoFnDelegatingAggregatorTest {
String name = "agg";
CombineFn<Long, ?, Long> combiner = mockCombineFn(Long.class);
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
DelegatingAggregator<Long, Long> aggregator =
(DelegatingAggregator<Long, Long>) doFn.createAggregator(name, combiner);
@@ -91,7 +91,7 @@ public class DoFnDelegatingAggregatorTest {
String name = "agg";
CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
DelegatingAggregator<Double, Double> aggregator =
(DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
@@ -114,7 +114,7 @@ public class DoFnDelegatingAggregatorTest {
String name = "agg";
CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
DelegatingAggregator<Double, Double> aggregator =
(DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
@@ -127,7 +127,7 @@ public class DoFnDelegatingAggregatorTest {
String name = "agg";
CombineFn<Double, ?, Double> combiner = mockCombineFn(Double.class);
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
+ OldDoFn<Void, Void> doFn = new NoOpDoFn<>();
DelegatingAggregator<Double, Double> aggregator =
(DelegatingAggregator<Double, Double>) doFn.createAggregator(name, combiner);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
index 3238f2c..0cb3d7b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnReflectorTest.java
@@ -158,7 +158,7 @@ public class DoFnReflectorTest {
@Test
public void testDoFnInvokersReused() throws Exception {
- // Ensures that we don't create a new Invoker class for every instance of the DoFn.
+ // Ensures that we don't create a new Invoker class for every instance of the OldDoFn.
IdentityParent fn1 = new IdentityParent();
IdentityParent fn2 = new IdentityParent();
DoFnReflector reflector1 = underTest(fn1);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
deleted file mode 100644
index 9242ece..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTest.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.Matchers.empty;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertThat;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.runners.AggregatorValues;
-import org.apache.beam.sdk.testing.NeedsRunner;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Max.MaxIntegerFn;
-import org.apache.beam.sdk.transforms.Sum.SumIntegerFn;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-
-import com.google.common.collect.ImmutableMap;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Tests for DoFn.
- */
-@RunWith(JUnit4.class)
-public class DoFnTest implements Serializable {
-
- @Rule
- public transient ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void testCreateAggregatorWithCombinerSucceeds() {
- String name = "testAggregator";
- Sum.SumLongFn combiner = new Sum.SumLongFn();
-
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
-
- Aggregator<Long, Long> aggregator = doFn.createAggregator(name, combiner);
-
- assertEquals(name, aggregator.getName());
- assertEquals(combiner, aggregator.getCombineFn());
- }
-
- @Test
- public void testCreateAggregatorWithNullNameThrowsException() {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("name cannot be null");
-
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
-
- doFn.createAggregator(null, new Sum.SumLongFn());
- }
-
- @Test
- public void testCreateAggregatorWithNullCombineFnThrowsException() {
- CombineFn<Object, Object, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithNullSerializableFnThrowsException() {
- SerializableFunction<Iterable<Object>, Object> combiner = null;
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("combiner cannot be null");
-
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
-
- doFn.createAggregator("testAggregator", combiner);
- }
-
- @Test
- public void testCreateAggregatorWithSameNameThrowsException() {
- String name = "testAggregator";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
-
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
-
- doFn.createAggregator(name, combiner);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Cannot create");
- thrown.expectMessage(name);
- thrown.expectMessage("already exists");
-
- doFn.createAggregator(name, combiner);
- }
-
- @Test
- public void testCreateAggregatorsWithDifferentNamesSucceeds() {
- String nameOne = "testAggregator";
- String nameTwo = "aggregatorPrime";
- CombineFn<Double, ?, Double> combiner = new Max.MaxDoubleFn();
-
- DoFn<Void, Void> doFn = new NoOpDoFn<>();
-
- Aggregator<Double, Double> aggregatorOne =
- doFn.createAggregator(nameOne, combiner);
- Aggregator<Double, Double> aggregatorTwo =
- doFn.createAggregator(nameTwo, combiner);
-
- assertNotEquals(aggregatorOne, aggregatorTwo);
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInStartBundleThrows() {
- TestPipeline p = createTestPipeline(new DoFn<String, String>() {
- @Override
- public void startBundle(DoFn<String, String>.Context c) throws Exception {
- createAggregator("anyAggregate", new MaxIntegerFn());
- }
-
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {}
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
-
- p.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInProcessElementThrows() {
- TestPipeline p = createTestPipeline(new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- createAggregator("anyAggregate", new MaxIntegerFn());
- }
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
-
- p.run();
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testCreateAggregatorInFinishBundleThrows() {
- TestPipeline p = createTestPipeline(new DoFn<String, String>() {
- @Override
- public void finishBundle(DoFn<String, String>.Context c) throws Exception {
- createAggregator("anyAggregate", new MaxIntegerFn());
- }
-
- @Override
- public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {}
- });
-
- thrown.expect(PipelineExecutionException.class);
- thrown.expectCause(isA(IllegalStateException.class));
-
- p.run();
- }
-
- /**
- * Initialize a test pipeline with the specified {@link DoFn}.
- */
- private <InputT, OutputT> TestPipeline createTestPipeline(DoFn<InputT, OutputT> fn) {
- TestPipeline pipeline = TestPipeline.create();
- pipeline.apply(Create.of((InputT) null))
- .apply(ParDo.of(fn));
-
- return pipeline;
- }
-
- @Test
- public void testPopulateDisplayDataDefaultBehavior() {
- DoFn<String, String> usesDefault =
- new DoFn<String, String>() {
- @Override
- public void processElement(ProcessContext c) throws Exception {}
- };
-
- DisplayData data = DisplayData.from(usesDefault);
- assertThat(data.items(), empty());
- }
-
- @Test
- @Category(NeedsRunner.class)
- public void testAggregators() throws Exception {
- Pipeline pipeline = TestPipeline.create();
-
- CountOddsFn countOdds = new CountOddsFn();
- pipeline
- .apply(Create.of(1, 3, 5, 7, 2, 4, 6, 8, 10, 12, 14, 20, 42, 68, 100))
- .apply(ParDo.of(countOdds));
- PipelineResult result = pipeline.run();
-
- AggregatorValues<Integer> values = result.getAggregatorValues(countOdds.aggregator);
- assertThat(values.getValuesAtSteps(),
- equalTo((Map<String, Integer>) ImmutableMap.<String, Integer>of("ParDo(CountOdds)", 4)));
- }
-
- private static class CountOddsFn extends DoFn<Integer, Void> {
- @Override
- public void processElement(ProcessContext c) throws Exception {
- if (c.element() % 2 == 1) {
- aggregator.addValue(1);
- }
- }
-
- Aggregator<Integer, Integer> aggregator =
- createAggregator("odds", new SumIntegerFn());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index 8460a7c..e379f11 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -235,7 +235,7 @@ public class DoFnTesterTest {
final PCollectionView<Integer> value =
PCollectionViews.singletonView(
TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
- DoFn<Integer, Integer> fn = new SideInputDoFn(value);
+ OldDoFn<Integer, Integer> fn = new SideInputDoFn(value);
DoFnTester<Integer, Integer> tester = DoFnTester.of(fn);
@@ -251,7 +251,7 @@ public class DoFnTesterTest {
final PCollectionView<Integer> value =
PCollectionViews.singletonView(
TestPipeline.create(), WindowingStrategy.globalDefault(), true, 0, VarIntCoder.of());
- DoFn<Integer, Integer> fn = new SideInputDoFn(value);
+ OldDoFn<Integer, Integer> fn = new SideInputDoFn(value);
DoFnTester<Integer, Integer> tester = DoFnTester.of(fn);
tester.setSideInput(value, GlobalWindow.INSTANCE, -2);
@@ -264,7 +264,7 @@ public class DoFnTesterTest {
assertThat(tester.peekOutputElements(), containsInAnyOrder(-2, -2, -2, -2));
}
- private static class SideInputDoFn extends DoFn<Integer, Integer> {
+ private static class SideInputDoFn extends OldDoFn<Integer, Integer> {
private final PCollectionView<Integer> value;
private SideInputDoFn(PCollectionView<Integer> value) {
@@ -278,9 +278,9 @@ public class DoFnTesterTest {
}
/**
- * A DoFn that adds values to an aggregator and converts input to String in processElement.
+ * A OldDoFn that adds values to an aggregator and converts input to String in processElement.
*/
- private static class CounterDoFn extends DoFn<Long, String> {
+ private static class CounterDoFn extends OldDoFn<Long, String> {
Aggregator<Long, Long> agg = createAggregator("ctr", new Sum.SumLongFn());
private final long startBundleVal;
private final long finishBundleVal;