You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/12 02:28:22 UTC
[04/39] incubator-beam git commit: BEAM-261 Read.Bounded and
FlattenPCollection.
BEAM-261 Read.Bounded and FlattenPCollection.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/074b18f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/074b18f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/074b18f6
Branch: refs/heads/master
Commit: 074b18f6ae0cfc1a3cc986f89ded6a45e0a3eb57
Parents: a7e430d
Author: Thomas Weise <th...@apache.org>
Authored: Sun Sep 11 20:34:08 2016 -0700
Committer: Thomas Weise <th...@apache.org>
Committed: Sun Oct 16 23:25:28 2016 -0700
----------------------------------------------------------------------
runners/apex/pom.xml | 2 +-
.../runners/apex/ApexPipelineTranslator.java | 16 ++
.../apache/beam/runners/apex/ApexRunner.java | 10 +-
.../FlattenPCollectionTranslator.java | 53 ++++-
.../apex/translators/TranslationContext.java | 24 +--
.../functions/ApexGroupByKeyOperator.java | 6 +-
.../functions/ApexParDoOperator.java | 6 +-
.../beam/runners/apex/examples/IntTests.java | 207 +++++++++++++++++++
.../translators/ReadUnboundTranslatorTest.java | 2 +-
9 files changed, 284 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 21e53a8..e9377b4 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -28,7 +28,7 @@
<relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>beam-runners-apex_3.4.0</artifactId>
+ <artifactId>beam-runners-apex</artifactId>
<name>Apache Beam :: Runners :: Apex</name>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index 8ea7139..b0391b4 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -25,6 +25,8 @@ import org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
import org.apache.beam.runners.apex.translators.TransformTranslator;
import org.apache.beam.runners.apex.translators.TranslationContext;
+import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -64,6 +66,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
// register TransformTranslators
registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator());
+ registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator());
registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
registerTransformTranslator(Flatten.FlattenPCollectionList.class,
new FlattenPCollectionTranslator());
@@ -130,5 +133,18 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor {
return transformTranslators.get(transformClass);
}
+ private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void translate(Read.Bounded<T> transform, TranslationContext context) {
+ // TODO: adapter is visibleForTesting
+ BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(transform.getSource());
+ ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
+ unboundedSource, context.getPipelineOptions());
+ context.addOperator(operator, operator.output);
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 87c8f97..5fa3f23 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -20,10 +20,7 @@ package org.apache.beam.runners.apex;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.beam.runners.apex.translators.TranslationContext;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Create;
@@ -33,9 +30,8 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AssignWindows;
+import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -70,6 +66,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
@Override
public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
+//System.out.println("transform: " + transform);
+
if (Window.Bound.class.equals(transform.getClass())) {
return (OutputT) ((PCollection) input).apply(
new AssignWindowsAndSetStrategy((Window.Bound) transform));
@@ -79,8 +77,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
input.getPipeline(),
WindowingStrategy.globalDefault(),
PCollection.IsBounded.BOUNDED);
- } else if (Read.Bounded.class.equals(transform.getClass())) {
- return (OutputT) ((PBegin) input).apply(new UnboundedReadFromBoundedSource<>(((Read.Bounded)transform).getSource()));
} else {
return super.apply(transform, input);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index f228149..e153867 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -18,11 +18,15 @@
package org.apache.beam.runners.apex.translators;
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import com.datatorrent.lib.stream.StreamMerger;
+import com.google.common.collect.Lists;
/**
* Flatten.FlattenPCollectionList translation to Apex operator.
@@ -34,19 +38,46 @@ public class FlattenPCollectionTranslator<T> implements
@Override
public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
- StreamMerger<T> operator = null;
- PCollectionList<T> collections = context.getInput();
- if (collections.size() > 2) {
- throw new UnsupportedOperationException("Currently supports only 2 collections: " + transform);
- }
- for (PCollection<T> collection : collections.getAll()) {
- if (null == operator) {
- operator = new StreamMerger<T>();
- context.addStream(collection, operator.data1);
+ PCollection<T> firstCollection = null;
+ PCollectionList<T> input = context.getInput();
+ List<PCollection<T>> collections = input.getAll();
+ List<PCollection<T>> remainingCollections = Lists.newArrayList();
+ while (!collections.isEmpty()) {
+ for (PCollection<T> collection : collections) {
+ if (null == firstCollection) {
+ firstCollection = collection;
+ } else {
+ StreamMerger<T> operator = new StreamMerger<>();
+ context.addStream(firstCollection, operator.data1);
+ context.addStream(collection, operator.data2);
+ if (collections.size() > 2) {
+ PCollection<T> resultCollection = intermediateCollection(collection, collection.getCoder());
+ context.addOperator(operator, operator.out, resultCollection);
+ remainingCollections.add(resultCollection);
+ } else {
+ // final stream merge
+ context.addOperator(operator, operator.out);
+ }
+ firstCollection = null;
+ }
+ }
+ if (firstCollection != null) {
+ // push to next merge level
+ remainingCollections.add(firstCollection);
+ }
+ if (remainingCollections.size() > 1) {
+ collections = remainingCollections;
+ remainingCollections = Lists.newArrayList();
} else {
- context.addStream(collection, operator.data2);
+ collections = Lists.newArrayList();
}
}
- context.addOperator(operator, operator.out);
}
+
+ public static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) {
+ PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ output.setCoder(outputCoder);
+ return output;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
index 92afd58..ab7cd0a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -82,30 +82,22 @@ public class TranslationContext {
}
public void addOperator(Operator operator, OutputPort port) {
- // Apex DAG requires a unique operator name
- // use the transform's name and make it unique
- String name = getCurrentTransform().getFullName();
- for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++);
- this.operators.put(name, operator);
- PCollection<?> output = getOutput();
- this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
+ addOperator(operator, port, this.<PCollection<?>>getOutput());
}
/**
- * Add operator that is internal to a transformation.
- * @param output
+ * Add intermediate operator for the current transformation.
* @param operator
* @param port
- * @param name
+ * @param output
*/
- public <T> PInput addInternalOperator(Operator operator, OutputPort port, String name, Coder<T> coder) {
- checkArgument(this.operators.get(name) == null, "duplicate operator " + name);
+ public void addOperator(Operator operator, OutputPort port, PCollection output) {
+ // Apex DAG requires a unique operator name
+ // use the transform's name and make it unique
+ String name = getCurrentTransform().getFullName();
+ for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++);
this.operators.put(name, operator);
- PCollection<T> input = getInput();
- PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- output.setCoder(coder);
this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>()));
- return output;
}
public void addStream(PInput input, InputPort inputPort) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index 4608c92..29e1b32 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.KeyedWorkItem;
import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.SystemReduceFn;
import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingInternals;
@@ -56,14 +56,14 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
+import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index 8005832..d358d14 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -22,14 +22,14 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
import org.apache.beam.sdk.util.ExecutionContext;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
new file mode 100644
index 0000000..0ee3442
--- /dev/null
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
@@ -0,0 +1,207 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.examples;
+
+
+ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+ import static org.hamcrest.Matchers.is;
+ import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
+ import org.apache.beam.sdk.testing.PAssert;
+ import org.apache.beam.sdk.testing.RunnableOnService;
+ import org.apache.beam.sdk.testing.TestPipeline;
+ import org.apache.beam.sdk.transforms.Count;
+ import org.apache.beam.sdk.transforms.DoFn;
+ import org.apache.beam.sdk.transforms.Max;
+ import org.apache.beam.sdk.transforms.Min;
+ import org.apache.beam.sdk.transforms.PTransform;
+ import org.apache.beam.sdk.transforms.ParDo;
+ import org.apache.beam.sdk.transforms.RemoveDuplicates;
+ import org.apache.beam.sdk.transforms.SerializableFunction;
+ import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+ import org.joda.time.Duration;
+ import org.joda.time.Instant;
+ import org.junit.Test;
+ import org.junit.experimental.categories.Category;
+ import org.junit.runner.RunWith;
+ import org.junit.runners.JUnit4;
+
+ /**
+ * Tests for {@link CountingInput}.
+ */
+ @RunWith(JUnit4.class)
+ public class IntTests {
+ public static void addCountingAsserts(PCollection<Long> input, long numElements) {
+ // Count == numElements
+ PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Unique count == numElements
+ PAssert.thatSingleton(
+ input
+ .apply(RemoveDuplicates.<Long>create())
+ .apply("UniqueCount", Count.<Long>globally()))
+ .isEqualTo(numElements);
+ // Min == 0
+ PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L);
+ // Max == numElements-1
+ PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
+ .isEqualTo(numElements - 1);
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testBoundedInput() {
+ //Pipeline p = TestPipeline.create();
+ ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
+ options.setRunner(TestApexRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+ long numElements = 1000;
+ PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ @Test
+ public void testBoundedDisplayData() {
+ PTransform<?, ?> input = CountingInput.upTo(1234);
+ DisplayData displayData = DisplayData.from(input);
+ assertThat(displayData, hasDisplayItem("upTo", 1234));
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedInput() {
+ //Pipeline p = TestPipeline.create();
+ ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
+ options.setRunner(TestApexRunner.class);
+ Pipeline p = Pipeline.create(options);
+
+
+ long numElements = 1000;
+
+ PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
+
+// input = input.apply(Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10))));
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testUnboundedInputRate() {
+ Pipeline p = TestPipeline.create();
+ long numElements = 5000;
+
+ long elemsPerPeriod = 10L;
+ Duration periodLength = Duration.millis(8);
+ PCollection<Long> input =
+ p.apply(
+ CountingInput.unbounded()
+ .withRate(elemsPerPeriod, periodLength)
+ .withMaxNumRecords(numElements));
+
+ addCountingAsserts(input, numElements);
+ long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod;
+ Instant startTime = Instant.now();
+ p.run();
+ Instant endTime = Instant.now();
+ assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true));
+ }
+
+ private static class ElementValueDiff extends DoFn<Long, Long> {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element() - c.timestamp().getMillis());
+ }
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testUnboundedInputTimestamps() {
+ Pipeline p = TestPipeline.create();
+ long numElements = 1000;
+
+ PCollection<Long> input =
+ p.apply(
+ CountingInput.unbounded()
+ .withTimestampFn(new ValueAsTimestampFn())
+ .withMaxNumRecords(numElements));
+ addCountingAsserts(input, numElements);
+
+ PCollection<Long> diffs =
+ input
+ .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+ .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create());
+ // This assert also confirms that diffs only has one unique value.
+ PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+ p.run();
+ }
+
+ @Test
+ public void testUnboundedDisplayData() {
+ Duration maxReadTime = Duration.standardHours(5);
+ SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() {
+ @Override
+ public Instant apply(Long input) {
+ return Instant.now();
+ }
+ };
+
+ PTransform<?, ?> input = CountingInput.unbounded()
+ .withMaxNumRecords(1234)
+ .withMaxReadTime(maxReadTime)
+ .withTimestampFn(timestampFn);
+
+ DisplayData displayData = DisplayData.from(input);
+
+ assertThat(displayData, hasDisplayItem("maxRecords", 1234));
+ assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+ assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass()));
+ }
+
+ /**
+ * A timestamp function that uses the given value as the timestamp. Because the input values will
+ * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out
+ * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
+ */
+ private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> {
+ @Override
+ public Instant apply(Long input) {
+ return new Instant(input);
+ }
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
index 6260632..f954537 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
@@ -99,7 +99,7 @@ public class ReadUnboundTranslatorTest {
ApexRunnerResult result = (ApexRunnerResult)p.run();
DAG dag = result.getApexDAG();
- DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)/Read(BoundedCountingSource)/Read(BoundedToUnboundedSourceAdapter)");
+ DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)");
Assert.assertNotNull(om);
Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class);