You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/05 22:56:38 UTC
[4/8] incubator-beam git commit: fixup! Move GroupByKey expansion
into DirectPipelineRunner
fixup! Move GroupByKey expansion into DirectPipelineRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bcc010c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bcc010c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bcc010c2
Branch: refs/heads/master
Commit: bcc010c2ffc3515321c0cde8eedc4cb18f29be38
Parents: 080eeaa
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 12:04:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 12:30:04 2016 -0700
----------------------------------------------------------------------
.../FlinkBatchTransformTranslators.java | 21 +-
.../beam/runners/spark/SparkPipelineRunner.java | 20 ++
.../sdk/runners/DirectPipelineRunner.java | 140 +-----------
.../inprocess/GroupByKeyEvaluatorFactory.java | 2 +-
.../cloud/dataflow/sdk/util/GroupByKeyOnly.java | 43 ----
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 226 +++++++++++++++++++
.../sdk/util/ReifyTimestampsAndWindows.java | 48 ----
.../GroupByKeyEvaluatorFactoryTest.java | 2 +-
8 files changed, 253 insertions(+), 249 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index b09d033..b3c0cea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -53,7 +53,6 @@ import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
@@ -113,7 +112,6 @@ public class FlinkBatchTransformTranslators {
TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
- TRANSLATORS.put(GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());
// TODO we're currently ignoring windows here but that has to change in the future
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
@@ -303,25 +301,8 @@ public class FlinkBatchTransformTranslators {
}
}
- private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKeyOnly<K, V>> {
-
- @Override
- public void translateNode(GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
- DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
- GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
-
- TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
-
- Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
- GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
- new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
- }
-
/**
- * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch}
+ * Translates a GroupByKey while ignoring window assignments. Current ignores windows.
*/
private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index d5e4186..71e358c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -23,7 +23,10 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.PValue;
@@ -105,6 +108,23 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
}
/**
+ * Overrides for this runner.
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public <OT extends POutput, IT extends PInput> OT apply(
+ PTransform<IT, OT> transform, IT input) {
+
+ if (transform instanceof GroupByKey) {
+ return (OT) ((PCollection) input).apply(
+ new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+ } else {
+ return super.apply(transform, input);
+ }
+ }
+
+
+ /**
* No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
* thread.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 629be83..3940d32 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -26,8 +26,6 @@ import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.io.AvroIO;
import com.google.cloud.dataflow.sdk.io.FileBasedSink;
@@ -49,19 +47,15 @@ import com.google.cloud.dataflow.sdk.transforms.Partition;
import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner;
import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.util.common.Counter;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.values.KV;
@@ -85,7 +79,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -259,7 +252,8 @@ public class DirectPipelineRunner
} else if (transform instanceof AvroIO.Write.Bound) {
return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input);
} else if (transform instanceof GroupByKey) {
- return (OutputT) ((PCollection) input).apply(new DirectGroupByKey((GroupByKey) transform));
+ return (OutputT)
+ ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
} else {
return super.apply(transform, input);
}
@@ -405,43 +399,6 @@ public class DirectPipelineRunner
}
}
- private static class DirectGroupByKey<K, V>
- extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
- private GroupByKey<K, V> originalTransform;
-
- public DirectGroupByKey(GroupByKey<K, V> originalTransform) {
- this.originalTransform = originalTransform;
- }
-
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
-
- // Group by just the key.
- // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
- // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
- // introduced in here.
- .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
- // Sort each key's values by timestamp. GroupAlsoByWindow requires
- // its input to be sorted by timestamp.
- .apply(new DirectPipelineRunner.SortValuesByTimestamp<K, V>())
-
- // Group each key's values by window, merging windows as needed.
- .apply(new DirectPipelineRunner.GroupAlsoByWindow<K, V>(windowingStrategy))
-
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(
- originalTransform.updateWindowingStrategy(windowingStrategy));
- }
- }
-
/**
* Apply the override for AvroIO.Write.Bound if the user requested sharding controls
* greater than one.
@@ -1172,95 +1129,6 @@ public class DirectPipelineRunner
/////////////////////////////////////////////////////////////////////////////
/**
- * Helper transform that sorts the values associated with each key
- * by timestamp.
- */
- private static class SortValuesByTimestamp<K, V>
- extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
- PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
- @Override
- public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- return input.apply(ParDo.of(
- new DoFn<KV<K, Iterable<WindowedValue<V>>>,
- KV<K, Iterable<WindowedValue<V>>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
- K key = kvs.getKey();
- Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
- List<WindowedValue<V>> sortedValues = new ArrayList<>();
- for (WindowedValue<V> value : unsortedValues) {
- sortedValues.add(value);
- }
- Collections.sort(sortedValues,
- new Comparator<WindowedValue<V>>() {
- @Override
- public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
- return e1.getTimestamp().compareTo(e2.getTimestamp());
- }
- });
- c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
- }}))
- .setCoder(input.getCoder());
- }
- }
-
- /**
- * Helper transform that takes a collection of timestamp-ordered
- * values associated with each key, groups the values by window,
- * combines windows as needed, and for each window in each key,
- * outputs a collection of key/value-list pairs implicitly assigned
- * to the window and with the timestamp derived from that window.
- */
- private static class GroupAlsoByWindow<K, V>
- extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
- PCollection<KV<K, Iterable<V>>>> {
- private final WindowingStrategy<?, ?> windowingStrategy;
-
- public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public PCollection<KV<K, Iterable<V>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
- (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<Iterable<WindowedValue<V>>> inputValueCoder =
- inputKvCoder.getValueCoder();
-
- IterableCoder<WindowedValue<V>> inputIterableValueCoder =
- (IterableCoder<WindowedValue<V>>) inputValueCoder;
- Coder<WindowedValue<V>> inputIterableElementCoder =
- inputIterableValueCoder.getElemCoder();
- WindowedValueCoder<V> inputIterableWindowedValueCoder =
- (WindowedValueCoder<V>) inputIterableElementCoder;
-
- Coder<V> inputIterableElementValueCoder =
- inputIterableWindowedValueCoder.getValueCoder();
- Coder<Iterable<V>> outputValueCoder =
- IterableCoder.of(inputIterableElementValueCoder);
- Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
- return input
- .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
- .setCoder(outputKvCoder);
- }
-
- private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>
- groupAlsoByWindowsFn(
- WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
- return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
- strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
- }
- }
-
- /**
* The key by which GBK groups inputs - elements are grouped by the encoded form of the key,
* but the original key may be accessed as well.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index b59ec56..4f97db0 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -31,10 +31,10 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
deleted file mode 100644
index 8db87d2..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Runner-specific primitive that groups by key only, ignoring any window assignments.
- */
-public class GroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, Iterable<V>>>> {
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- @Override
- public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return GroupByKey.getOutputKvCoder(input.getCoder());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..4ce042d
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.IterableCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a simpler {@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds by reifying windows and timestamps (making
+ * them part of the element rather than metadata), performing a {@link GroupByKeyOnly} primitive,
+ * then using a {@link GroupAlsoByWindow} transform to further group the resulting elements by
+ * window.
+ *
+ * <p>Today {@link GroupAlsoByWindow} is implemented as a {@link ParDo} that calls reserved
+ * internal methods.
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ private GroupByKey<K, V> gbkTransform;
+
+ public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+ this.gbkTransform = originalTransform;
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+ return input
+ // Make each input element's timestamp and assigned windows
+ // explicit, in the value part.
+ .apply(new ReifyTimestampsAndWindows<K, V>())
+
+ // Group by just the key.
+ // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+ // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+ // introduced in here.
+ .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+ // Sort each key's values by timestamp. GroupAlsoByWindow requires
+ // its input to be sorted by timestamp.
+ .apply(new SortValuesByTimestamp<K, V>())
+
+ // Group each key's values by window, merging windows as needed.
+ .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+ // And update the windowing strategy as appropriate.
+ .setWindowingStrategyInternal(
+ gbkTransform.updateWindowingStrategy(windowingStrategy));
+ }
+
+ /**
+ * Runner-specific primitive that groups by key only, ignoring any window assignments. A
+ * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
+ * or evaluate this class.
+ */
+ public static class GroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ }
+
+ @Override
+ public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+ return GroupByKey.getOutputKvCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Helper transform that makes timestamps and window assignments
+ * explicit in the value part of each key/value pair.
+ */
+ public static class ReifyTimestampsAndWindows<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
+ @Override
+ public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
+
+ // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
+ // to this implementation of GBK. All runners need a way to get the key.
+ checkArgument(input.getCoder() instanceof KvCoder,
+ "%s requires its input to use a %s",
+ GroupByKey.class.getSimpleName(),
+ KvCoder.class.getSimpleName());
+
+ @SuppressWarnings("unchecked")
+ KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+ Coder<WindowedValue<V>> outputValueCoder =
+ FullWindowedValueCoder.of(
+ inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+ Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+ return input
+ .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
+ .setCoder(outputKvCoder);
+ }
+ }
+
+ /**
+ * Helper transform that sorts the values associated with each key
+ * by timestamp.
+ */
+ private static class SortValuesByTimestamp<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+ @Override
+ public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ return input
+ .apply(
+ ParDo.of(
+ new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+ K key = kvs.getKey();
+ Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+ List<WindowedValue<V>> sortedValues = new ArrayList<>();
+ for (WindowedValue<V> value : unsortedValues) {
+ sortedValues.add(value);
+ }
+ Collections.sort(
+ sortedValues,
+ new Comparator<WindowedValue<V>>() {
+ @Override
+ public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+ return e1.getTimestamp().compareTo(e2.getTimestamp());
+ }
+ });
+ c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+ }
+ }))
+ .setCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Helper transform that takes a collection of timestamp-ordered
+ * values associated with each key, groups the values by window,
+ * combines windows as needed, and for each window in each key,
+ * outputs a collection of key/value-list pairs implicitly assigned
+ * to the window and with the timestamp derived from that window.
+ */
+ private static class GroupAlsoByWindow<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
+ private final WindowingStrategy<?, ?> windowingStrategy;
+
+ public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PCollection<KV<K, Iterable<V>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ @SuppressWarnings("unchecked")
+ KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+ (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+ IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+ (IterableCoder<WindowedValue<V>>) inputValueCoder;
+ Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+ WindowedValueCoder<V> inputIterableWindowedValueCoder =
+ (WindowedValueCoder<V>) inputIterableElementCoder;
+
+ Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+ Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
+ Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+ return input
+ .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+ .setCoder(outputKvCoder);
+ }
+
+ private <W extends BoundedWindow>
+ GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
+ WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+ strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
deleted file mode 100644
index 1a6cf9a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Helper transform that makes timestamps and window assignments
- * explicit in the value part of each key/value pair.
- */
-public class ReifyTimestampsAndWindows<K, V>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, WindowedValue<V>>>> {
- @Override
- public PCollection<KV<K, WindowedValue<V>>> apply(
- PCollection<KV<K, V>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
- Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
- inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- Coder<KV<K, WindowedValue<V>>> outputKvCoder =
- KvCoder.of(keyCoder, outputValueCoder);
- return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
- .setCoder(outputKvCoder);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index a683b31..9933ec1 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -26,9 +26,9 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.C
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;