You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/08/25 18:55:31 UTC
[17/50] incubator-beam git commit: Port easy parts of DataflowRunner
to new DoFn
Port easy parts of DataflowRunner to new DoFn
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fcdd15b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fcdd15b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fcdd15b8
Branch: refs/heads/gearpump-runner
Commit: fcdd15b81b93f87de0aa02bfb3b09740bc259c4c
Parents: a1d601a
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Aug 8 20:35:59 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Aug 9 12:41:52 2016 -0700
----------------------------------------------------------------------
.../beam/runners/dataflow/DataflowRunner.java | 92 ++++++++++----------
1 file changed, 45 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fcdd15b8/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index fadd9c7..4d34ec4 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -761,31 +761,30 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
PTransform<PCollection<T>, PCollection<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>>> {
/**
- * A {@link OldDoFn} that for each element outputs a {@code KV} structure suitable for
+ * A {@link DoFn} that for each element outputs a {@code KV} structure suitable for
* grouping by the hash of the window's byte representation and sorting the grouped values
* using the window's byte representation.
*/
@SystemDoFnInternal
private static class UseWindowHashAsKeyAndWindowAsSortKeyDoFn<T, W extends BoundedWindow>
- extends OldDoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> implements
- OldDoFn.RequiresWindowAccess {
+ extends DoFn<T, KV<Integer, KV<W, WindowedValue<T>>>> {
private final IsmRecordCoder<?> ismCoderForHash;
private UseWindowHashAsKeyAndWindowAsSortKeyDoFn(IsmRecordCoder<?> ismCoderForHash) {
this.ismCoderForHash = ismCoderForHash;
}
- @Override
- public void processElement(ProcessContext c) throws Exception {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws Exception {
@SuppressWarnings("unchecked")
- W window = (W) c.window();
+ W window = (W) untypedWindow;
c.output(
KV.of(ismCoderForHash.hash(ImmutableList.of(window)),
KV.of(window,
WindowedValue.of(
c.element(),
c.timestamp(),
- c.window(),
+ window,
c.pane()))));
}
}
@@ -828,14 +827,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
extends PTransform<PCollection<T>, PCollectionView<T>> {
/**
- * A {@link OldDoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
+ * A {@link DoFn} that outputs {@link IsmRecord}s. These records are structured as follows:
* <ul>
* <li>Key 1: Window
* <li>Value: Windowed value
* </ul>
*/
static class IsmRecordForSingularValuePerWindowDoFn<T, W extends BoundedWindow>
- extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+ extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
IsmRecord<WindowedValue<T>>> {
private final Coder<W> windowCoder;
@@ -843,7 +842,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.windowCoder = windowCoder;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
Optional<Object> previousWindowStructuralValue = Optional.absent();
T previousValue = null;
@@ -902,7 +901,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
applyForSingleton(
DataflowRunner runner,
PCollection<T> input,
- OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+ DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
IsmRecord<WindowedValue<FinalT>>> doFn,
boolean hasDefault,
FinalT defaultValue,
@@ -998,7 +997,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
static class BatchViewAsList<T>
extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
/**
- * A {@link OldDoFn} which creates {@link IsmRecord}s assuming that each element is within the
+ * A {@link DoFn} which creates {@link IsmRecord}s assuming that each element is within the
* global window. Each {@link IsmRecord} has
* <ul>
* <li>Key 1: Global window</li>
@@ -1008,15 +1007,15 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
@SystemDoFnInternal
static class ToIsmRecordForGlobalWindowDoFn<T>
- extends OldDoFn<T, IsmRecord<WindowedValue<T>>> {
+ extends DoFn<T, IsmRecord<WindowedValue<T>>> {
long indexInBundle;
- @Override
+ @StartBundle
public void startBundle(Context c) throws Exception {
indexInBundle = 0;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(IsmRecord.of(
ImmutableList.of(GlobalWindow.INSTANCE, indexInBundle),
@@ -1030,7 +1029,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
+ * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
* to locate the window boundaries. The {@link IsmRecord} has:
* <ul>
* <li>Key 1: Window</li>
@@ -1040,7 +1039,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
*/
@SystemDoFnInternal
static class ToIsmRecordForNonGlobalWindowDoFn<T, W extends BoundedWindow>
- extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
+ extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
IsmRecord<WindowedValue<T>>> {
private final Coder<W> windowCoder;
@@ -1048,7 +1047,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.windowCoder = windowCoder;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
long elementsInWindow = 0;
Optional<Object> previousWindowStructuralValue = Optional.absent();
@@ -1174,7 +1173,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
/**
- * A {@link OldDoFn} which groups elements by window boundaries. For each group,
+ * A {@link DoFn} which groups elements by window boundaries. For each group,
* the group of elements is transformed into a {@link TransformedMap}.
* The transformed {@code Map<K, V>} is backed by a {@code Map<K, WindowedValue<V>>}
* and contains a function {@code WindowedValue<V> -> V}.
@@ -1188,7 +1187,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* </ul>
*/
static class ToMapDoFn<K, V, W extends BoundedWindow>
- extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+ extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
IsmRecord<WindowedValue<TransformedMap<K,
WindowedValue<V>,
V>>>> {
@@ -1198,7 +1197,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.windowCoder = windowCoder;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c)
throws Exception {
Optional<Object> previousWindowStructuralValue = Optional.absent();
@@ -1358,18 +1357,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@SystemDoFnInternal
private static class GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>
- extends OldDoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>>
- implements OldDoFn.RequiresWindowAccess {
+ extends DoFn<KV<K, V>, KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> {
private final IsmRecordCoder<?> coder;
private GroupByKeyHashAndSortByKeyAndWindowDoFn(IsmRecordCoder<?> coder) {
this.coder = coder;
}
- @Override
- public void processElement(ProcessContext c) throws Exception {
+ @ProcessElement
+ public void processElement(ProcessContext c, BoundedWindow untypedWindow) throws Exception {
@SuppressWarnings("unchecked")
- W window = (W) c.window();
+ W window = (W) untypedWindow;
c.output(
KV.of(coder.hash(ImmutableList.of(c.element().getKey())),
@@ -1377,7 +1375,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
WindowedValue.of(
c.element().getValue(),
c.timestamp(),
- (BoundedWindow) window,
+ untypedWindow,
c.pane()))));
}
}
@@ -1412,7 +1410,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link OldDoFn} which creates {@link IsmRecord}s comparing successive elements windows
+ * A {@link DoFn} which creates {@link IsmRecord}s comparing successive elements windows
* and keys to locate window and key boundaries. The main output {@link IsmRecord}s have:
* <ul>
* <li>Key 1: Window</li>
@@ -1424,11 +1422,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* <p>Additionally, we output all the unique keys per window seen to {@code outputForEntrySet}
* and the unique key count per window to {@code outputForSize}.
*
- * <p>Finally, if this OldDoFn has been requested to perform unique key checking, it will
+ * <p>Finally, if this {@link DoFn} has been requested to perform unique key checking, it will
* throw an {@link IllegalStateException} if more than one key per window is found.
*/
static class ToIsmRecordForMapLikeDoFn<K, V, W extends BoundedWindow>
- extends OldDoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
+ extends DoFn<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>,
IsmRecord<WindowedValue<V>>> {
private final TupleTag<KV<Integer, KV<W, Long>>> outputForSize;
@@ -1452,7 +1450,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.uniqueKeysExpected = uniqueKeysExpected;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
long currentKeyIndex = 0;
// We use one based indexing while counting
@@ -1557,7 +1555,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window of:
+ * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window of:
* <ul>
* <li>Key 1: META key</li>
* <li>Key 2: window</li>
@@ -1565,17 +1563,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* <li>Value: sum of values for window</li>
* </ul>
*
- * <p>This {@link OldDoFn} is meant to be used to compute the number of unique keys
+ * <p>This {@link DoFn} is meant to be used to compute the number of unique keys
* per window for map and multimap side inputs.
*/
static class ToIsmMetadataRecordForSizeDoFn<K, V, W extends BoundedWindow>
- extends OldDoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
+ extends DoFn<KV<Integer, Iterable<KV<W, Long>>>, IsmRecord<WindowedValue<V>>> {
private final Coder<W> windowCoder;
ToIsmMetadataRecordForSizeDoFn(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
Iterator<KV<W, Long>> iterator = c.element().getValue().iterator();
KV<W, Long> currentValue = iterator.next();
@@ -1606,7 +1604,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link OldDoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
+ * A {@link DoFn} which outputs a metadata {@link IsmRecord} per window and key pair of:
* <ul>
* <li>Key 1: META key</li>
* <li>Key 2: window</li>
@@ -1614,11 +1612,11 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* <li>Value: key</li>
* </ul>
*
- * <p>This {@link OldDoFn} is meant to be used to output index to key records
+ * <p>This {@link DoFn} is meant to be used to output index to key records
* per window for map and multimap side inputs.
*/
static class ToIsmMetadataRecordForKeyDoFn<K, V, W extends BoundedWindow>
- extends OldDoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
+ extends DoFn<KV<Integer, Iterable<KV<W, K>>>, IsmRecord<WindowedValue<V>>> {
private final Coder<K> keyCoder;
private final Coder<W> windowCoder;
@@ -1627,7 +1625,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.windowCoder = windowCoder;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c) throws Exception {
Iterator<KV<W, K>> iterator = c.element().getValue().iterator();
KV<W, K> currentValue = iterator.next();
@@ -1658,7 +1656,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A {@link OldDoFn} which partitions sets of elements by window boundaries. Within each
+ * A {@link DoFn} which partitions sets of elements by window boundaries. Within each
* partition, the set of elements is transformed into a {@link TransformedMap}.
* The transformed {@code Map<K, Iterable<V>>} is backed by a
* {@code Map<K, Iterable<WindowedValue<V>>>} and contains a function
@@ -1673,7 +1671,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
* </ul>
*/
static class ToMultimapDoFn<K, V, W extends BoundedWindow>
- extends OldDoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
+ extends DoFn<KV<Integer, Iterable<KV<W, WindowedValue<KV<K, V>>>>>,
IsmRecord<WindowedValue<TransformedMap<K,
Iterable<WindowedValue<V>>,
Iterable<V>>>>> {
@@ -1683,7 +1681,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
this.windowCoder = windowCoder;
}
- @Override
+ @ProcessElement
public void processElement(ProcessContext c)
throws Exception {
Optional<Object> previousWindowStructuralValue = Optional.absent();
@@ -2335,8 +2333,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// WindmillSink.
.apply(Reshuffle.<Integer, ValueWithRecordId<T>>of())
.apply("StripIds", ParDo.of(
- new OldDoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
- @Override
+ new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element().getValue().getValue());
}
@@ -2372,7 +2370,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
/**
- * A specialized {@link OldDoFn} for writing the contents of a {@link PCollection}
+ * A specialized {@link DoFn} for writing the contents of a {@link PCollection}
* to a streaming {@link PCollectionView} backend implementation.
*/
private static class StreamingPCollectionViewWriterFn<T>
@@ -2553,8 +2551,8 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
}
}
- private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
- @Override
+ private static class WrapAsList<T> extends DoFn<T, List<T>> {
+ @ProcessElement
public void processElement(ProcessContext c) {
c.output(Arrays.asList(c.element()));
}