You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/05 22:56:38 UTC

[4/8] incubator-beam git commit: fixup! Move GroupByKey expansion into DirectPipelineRunner

fixup! Move GroupByKey expansion into DirectPipelineRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bcc010c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bcc010c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bcc010c2

Branch: refs/heads/master
Commit: bcc010c2ffc3515321c0cde8eedc4cb18f29be38
Parents: 080eeaa
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 12:04:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Mar 28 12:30:04 2016 -0700

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         |  21 +-
 .../beam/runners/spark/SparkPipelineRunner.java |  20 ++
 .../sdk/runners/DirectPipelineRunner.java       | 140 +-----------
 .../inprocess/GroupByKeyEvaluatorFactory.java   |   2 +-
 .../cloud/dataflow/sdk/util/GroupByKeyOnly.java |  43 ----
 .../sdk/util/GroupByKeyViaGroupByKeyOnly.java   | 226 +++++++++++++++++++
 .../sdk/util/ReifyTimestampsAndWindows.java     |  48 ----
 .../GroupByKeyEvaluatorFactoryTest.java         |   2 +-
 8 files changed, 253 insertions(+), 249 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index b09d033..b3c0cea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -53,7 +53,6 @@ import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
 import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
 import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
-import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
@@ -113,7 +112,6 @@ public class FlinkBatchTransformTranslators {
 
     TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS.put(GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());
     // TODO we're currently ignoring windows here but that has to change in the future
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
 
@@ -303,25 +301,8 @@ public class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKeyOnly<K, V>> {
-
-    @Override
-    public void translateNode(GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
-      DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
-      GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
-
-      TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
-
-      Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
-      GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
-          new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
-      context.setOutputDataSet(context.getOutput(transform), outputDataSet);
-    }
-  }
-
   /**
-   * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch}
+   * Translates a GroupByKey while ignoring window assignments. Current ignores windows.
    */
   private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index d5e4186..71e358c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -23,7 +23,10 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
 import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
 import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.cloud.dataflow.sdk.values.POutput;
 import com.google.cloud.dataflow.sdk.values.PValue;
@@ -105,6 +108,23 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
   }
 
   /**
+   * Overrides for this runner.
+   */
+  @SuppressWarnings("rawtypes")
+  @Override
+  public <OT extends POutput, IT extends PInput> OT apply(
+      PTransform<IT, OT> transform, IT input) {
+
+    if (transform instanceof GroupByKey) {
+      return (OT) ((PCollection) input).apply(
+          new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+    } else {
+      return super.apply(transform, input);
+    }
+  }
+
+
+  /**
    * No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
    * thread.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 629be83..3940d32 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -26,8 +26,6 @@ import com.google.cloud.dataflow.sdk.PipelineResult;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.IterableCoder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.coders.ListCoder;
 import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.FileBasedSink;
@@ -49,19 +47,15 @@ import com.google.cloud.dataflow.sdk.transforms.Partition;
 import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
 import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
 import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner;
 import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.SerializableUtils;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
 import com.google.cloud.dataflow.sdk.util.TestCredential;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.util.common.Counter;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.values.KV;
@@ -85,7 +79,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -259,7 +252,8 @@ public class DirectPipelineRunner
     } else if (transform instanceof AvroIO.Write.Bound) {
       return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input);
     } else if (transform instanceof GroupByKey) {
-      return (OutputT) ((PCollection) input).apply(new DirectGroupByKey((GroupByKey) transform));
+      return (OutputT)
+          ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
     } else {
       return super.apply(transform, input);
     }
@@ -405,43 +399,6 @@ public class DirectPipelineRunner
     }
   }
 
-  private static class DirectGroupByKey<K, V>
-      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-
-    private GroupByKey<K, V> originalTransform;
-
-    public DirectGroupByKey(GroupByKey<K, V> originalTransform) {
-      this.originalTransform = originalTransform;
-    }
-
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-      return input
-          // Make each input element's timestamp and assigned windows
-          // explicit, in the value part.
-          .apply(new ReifyTimestampsAndWindows<K, V>())
-
-          // Group by just the key.
-          // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
-          // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
-          // introduced in here.
-          .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
-          // Sort each key's values by timestamp. GroupAlsoByWindow requires
-          // its input to be sorted by timestamp.
-          .apply(new DirectPipelineRunner.SortValuesByTimestamp<K, V>())
-
-          // Group each key's values by window, merging windows as needed.
-          .apply(new DirectPipelineRunner.GroupAlsoByWindow<K, V>(windowingStrategy))
-
-          // And update the windowing strategy as appropriate.
-          .setWindowingStrategyInternal(
-              originalTransform.updateWindowingStrategy(windowingStrategy));
-    }
-  }
-
   /**
    * Apply the override for AvroIO.Write.Bound if the user requested sharding controls
    * greater than one.
@@ -1172,95 +1129,6 @@ public class DirectPipelineRunner
   /////////////////////////////////////////////////////////////////////////////
 
   /**
-   * Helper transform that sorts the values associated with each key
-   * by timestamp.
-   */
-  private static class SortValuesByTimestamp<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
-    @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      return input.apply(ParDo.of(
-          new DoFn<KV<K, Iterable<WindowedValue<V>>>,
-                   KV<K, Iterable<WindowedValue<V>>>>() {
-            @Override
-            public void processElement(ProcessContext c) {
-              KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
-              K key = kvs.getKey();
-              Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
-              List<WindowedValue<V>> sortedValues = new ArrayList<>();
-              for (WindowedValue<V> value : unsortedValues) {
-                sortedValues.add(value);
-              }
-              Collections.sort(sortedValues,
-                               new Comparator<WindowedValue<V>>() {
-                  @Override
-                  public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
-                    return e1.getTimestamp().compareTo(e2.getTimestamp());
-                  }
-                });
-              c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
-            }}))
-          .setCoder(input.getCoder());
-    }
-  }
-
-  /**
-   * Helper transform that takes a collection of timestamp-ordered
-   * values associated with each key, groups the values by window,
-   * combines windows as needed, and for each window in each key,
-   * outputs a collection of key/value-list pairs implicitly assigned
-   * to the window and with the timestamp derived from that window.
-   */
-  private static class GroupAlsoByWindow<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<V>>>> {
-    private final WindowingStrategy<?, ?> windowingStrategy;
-
-    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public PCollection<KV<K, Iterable<V>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
-          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<Iterable<WindowedValue<V>>> inputValueCoder =
-          inputKvCoder.getValueCoder();
-
-      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
-          (IterableCoder<WindowedValue<V>>) inputValueCoder;
-      Coder<WindowedValue<V>> inputIterableElementCoder =
-          inputIterableValueCoder.getElemCoder();
-      WindowedValueCoder<V> inputIterableWindowedValueCoder =
-          (WindowedValueCoder<V>) inputIterableElementCoder;
-
-      Coder<V> inputIterableElementValueCoder =
-          inputIterableWindowedValueCoder.getValueCoder();
-      Coder<Iterable<V>> outputValueCoder =
-          IterableCoder.of(inputIterableElementValueCoder);
-      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
-      return input
-          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
-          .setCoder(outputKvCoder);
-    }
-
-    private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>
-        groupAlsoByWindowsFn(
-            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-    }
-  }
-
-  /**
    * The key by which GBK groups inputs - elements are grouped by the encoded form of the key,
    * but the original key may be accessed as well.
    */

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index b59ec56..4f97db0 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -31,10 +31,10 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
deleted file mode 100644
index 8db87d2..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Runner-specific primitive that groups by key only, ignoring any window assignments.
- */
-public class GroupByKeyOnly<K, V>
-    extends PTransform<PCollection<KV<K, V>>,
-                       PCollection<KV<K, Iterable<V>>>> {
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-    return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-        input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-  }
-
-  @Override
-  public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-    return GroupByKey.getOutputKvCoder(input.getCoder());
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..4ce042d
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.IterableCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a simpler {@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds by reifying windows and timestamps (making
+ * them part of the element rather than metadata), performing a {@link GroupByKeyOnly} primitive,
+ * then using a {@link GroupAlsoByWindow} transform to further group the resulting elements by
+ * window.
+ *
+ * <p>Today {@link GroupAlsoByWindow} is implemented as a {@link ParDo} that calls reserved
+ * internal methods.
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+    extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+  private GroupByKey<K, V> gbkTransform;
+
+  public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+    this.gbkTransform = originalTransform;
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+    return input
+        // Make each input element's timestamp and assigned windows
+        // explicit, in the value part.
+        .apply(new ReifyTimestampsAndWindows<K, V>())
+
+        // Group by just the key.
+        // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+        // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+        // introduced in here.
+        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+        // Sort each key's values by timestamp. GroupAlsoByWindow requires
+        // its input to be sorted by timestamp.
+        .apply(new SortValuesByTimestamp<K, V>())
+
+        // Group each key's values by window, merging windows as needed.
+        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+        // And update the windowing strategy as appropriate.
+        .setWindowingStrategyInternal(
+            gbkTransform.updateWindowingStrategy(windowingStrategy));
+  }
+
+  /**
+   * Runner-specific primitive that groups by key only, ignoring any window assignments. A
+   * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
+   * or evaluate this class.
+   */
+  public static class GroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    }
+
+    @Override
+    public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+      return GroupByKey.getOutputKvCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that makes timestamps and window assignments
+   * explicit in the value part of each key/value pair.
+   */
+  public static class ReifyTimestampsAndWindows<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
+    @Override
+    public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
+
+      // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
+      // to this implementation of GBK. All runners need a way to get the key.
+      checkArgument(input.getCoder() instanceof KvCoder,
+          "%s requires its input to use a %s",
+          GroupByKey.class.getSimpleName(),
+          KvCoder.class.getSimpleName());
+
+      @SuppressWarnings("unchecked")
+      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
+      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+      Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+      Coder<WindowedValue<V>> outputValueCoder =
+          FullWindowedValueCoder.of(
+              inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+      Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+      return input
+          .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
+          .setCoder(outputKvCoder);
+    }
+  }
+
+  /**
+   * Helper transform that sorts the values associated with each key
+   * by timestamp.
+   */
+  private static class SortValuesByTimestamp<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+    @Override
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      return input
+          .apply(
+              ParDo.of(
+                  new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
+                    @Override
+                    public void processElement(ProcessContext c) {
+                      KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+                      K key = kvs.getKey();
+                      Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+                      List<WindowedValue<V>> sortedValues = new ArrayList<>();
+                      for (WindowedValue<V> value : unsortedValues) {
+                        sortedValues.add(value);
+                      }
+                      Collections.sort(
+                          sortedValues,
+                          new Comparator<WindowedValue<V>>() {
+                            @Override
+                            public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+                              return e1.getTimestamp().compareTo(e2.getTimestamp());
+                            }
+                          });
+                      c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+                    }
+                  }))
+          .setCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that takes a collection of timestamp-ordered
+   * values associated with each key, groups the values by window,
+   * combines windows as needed, and for each window in each key,
+   * outputs a collection of key/value-list pairs implicitly assigned
+   * to the window and with the timestamp derived from that window.
+   */
+  private static class GroupAlsoByWindow<K, V>
+      extends PTransform<
+          PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
+    private final WindowingStrategy<?, ?> windowingStrategy;
+
+    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<KV<K, Iterable<V>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      @SuppressWarnings("unchecked")
+      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+      Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+          (IterableCoder<WindowedValue<V>>) inputValueCoder;
+      Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+      WindowedValueCoder<V> inputIterableWindowedValueCoder =
+          (WindowedValueCoder<V>) inputIterableElementCoder;
+
+      Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+      Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
+      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+      return input
+          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+          .setCoder(outputKvCoder);
+    }
+
+    private <W extends BoundedWindow>
+        GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
+            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
deleted file mode 100644
index 1a6cf9a..0000000
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.util;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.transforms.PTransform;
-import com.google.cloud.dataflow.sdk.transforms.ParDo;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.values.KV;
-import com.google.cloud.dataflow.sdk.values.PCollection;
-
-/**
- * Helper transform that makes timestamps and window assignments
- * explicit in the value part of each key/value pair.
- */
-public class ReifyTimestampsAndWindows<K, V>
-    extends PTransform<PCollection<KV<K, V>>,
-                       PCollection<KV<K, WindowedValue<V>>>> {
-  @Override
-  public PCollection<KV<K, WindowedValue<V>>> apply(
-      PCollection<KV<K, V>> input) {
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
-    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
-    Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
-        inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
-    Coder<KV<K, WindowedValue<V>>> outputKvCoder =
-        KvCoder.of(keyCoder, outputValueCoder);
-    return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
-        .setCoder(outputKvCoder);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/bcc010c2/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index a683b31..9933ec1 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -26,9 +26,9 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.C
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;