You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/19 19:04:17 UTC

[1/3] incubator-beam git commit: Add accessors for sub-coders of KeyedWorkItemCoder

Repository: incubator-beam
Updated Branches:
  refs/heads/master f184bcf37 -> 2f18cd268


Add accessors for sub-coders of KeyedWorkItemCoder


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca1b3a87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca1b3a87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca1b3a87

Branch: refs/heads/master
Commit: ca1b3a87fbe7218c0af912ba0f0deae8b903b1ac
Parents: 93a5d39
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 28 15:51:40 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 13 12:02:02 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java    | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca1b3a87/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
index 763f68b..ec5d821 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
@@ -79,6 +79,14 @@ public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K,
     this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder));
   }
 
+  public Coder<K> getKeyCoder() {
+    return keyCoder;
+  }
+
+  public Coder<ElemT> getElementCoder() {
+    return elemCoder;
+  }
+
   @Override
   public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
       throws CoderException, IOException {


[2/3] incubator-beam git commit: Make in-process GroupByKey respect future Beam model

Posted by ke...@apache.org.
Make in-process GroupByKey respect future Beam model

This introduces or clarifies the following transforms:

 - InProcessGroupByKey, which expands like GroupByKeyViaGroupByKeyOnly
   but with different intermediate PCollection types.
 - InProcessGroupByKeyOnly, which outputs KeyedWorkItem<K, V>. This existed
   already under a different name.
 - InProcessGroupAlsoByWindow, which is evaluated directly and
   accepts input elements of type KeyedWorkItem<K, V>.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aad284a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aad284a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aad284a5

Branch: refs/heads/master
Commit: aad284a513e829552e9ae7fa10ea89cfd89bdb5f
Parents: ca1b3a8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 28 16:12:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 13 12:04:31 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/BundleFactory.java      |   2 +-
 .../direct/GroupByKeyEvaluatorFactory.java      | 274 -------------------
 .../direct/InProcessEvaluationContext.java      |   2 +-
 ...rocessGroupAlsoByWindowEvaluatorFactory.java | 127 +++++++++
 .../runners/direct/InProcessGroupByKey.java     | 132 +++++++++
 ...InProcessGroupByKeyOnlyEvaluatorFactory.java | 183 +++++++++++++
 .../InProcessGroupByKeyOverrideFactory.java     |  41 +++
 .../runners/direct/InProcessPipelineRunner.java |   3 +-
 .../direct/TransformEvaluatorRegistry.java      |   8 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   4 +-
 ...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 183 +++++++++++++
 11 files changed, 676 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index 34529e7..fea4841 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -17,7 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
deleted file mode 100644
index 9a08996..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.StepTransformResult.Builder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.SystemReduceFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey}
- * {@link PTransform}.
- */
-class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
-  @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
-      AppliedPTransform<?, ?, ?> application,
-      CommittedBundle<?> inputBundle,
-      InProcessEvaluationContext evaluationContext) {
-    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
-    TransformEvaluator<InputT> evaluator = createEvaluator(
-            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
-    return evaluator;
-  }
-
-  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
-      final AppliedPTransform<
-              PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-              InProcessGroupByKeyOnly<K, V>>
-          application,
-      final CommittedBundle<KV<K, V>> inputBundle,
-      final InProcessEvaluationContext evaluationContext) {
-    return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application);
-  }
-
-  private static class GroupByKeyEvaluator<K, V>
-      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
-    private final InProcessEvaluationContext evaluationContext;
-
-    private final CommittedBundle<KV<K, V>> inputBundle;
-    private final AppliedPTransform<
-            PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-            InProcessGroupByKeyOnly<K, V>>
-        application;
-    private final Coder<K> keyCoder;
-    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
-
-    public GroupByKeyEvaluator(
-        InProcessEvaluationContext evaluationContext,
-        CommittedBundle<KV<K, V>> inputBundle,
-        AppliedPTransform<
-                PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
-                InProcessGroupByKeyOnly<K, V>>
-            application) {
-      this.evaluationContext = evaluationContext;
-      this.inputBundle = inputBundle;
-      this.application = application;
-
-      PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
-      keyCoder = getKeyCoder(input.getCoder());
-      groupingMap = new HashMap<>();
-    }
-
-    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
-      if (!(coder instanceof KvCoder)) {
-        throw new IllegalStateException();
-      }
-      @SuppressWarnings("unchecked")
-      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
-      return keyCoder;
-    }
-
-    @Override
-    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
-      KV<K, WindowedValue<V>> kv = element.getValue();
-      K key = kv.getKey();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
-            exn);
-      }
-      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<WindowedValue<V>>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(kv.getValue());
-    }
-
-    @Override
-    public InProcessTransformResult finishBundle() {
-      Builder resultBuilder = StepTransformResult.withoutHold(application);
-      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
-          groupingMap.entrySet()) {
-        K key = groupedEntry.getKey().key;
-        KeyedWorkItem<K, V> groupedKv =
-            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
-        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
-            evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
-        bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
-        resultBuilder.addOutput(bundle);
-      }
-      return resultBuilder.build();
-    }
-
-    private static class GroupingKey<K> {
-      private K key;
-      private byte[] encodedKey;
-
-      public GroupingKey(K key, byte[] encodedKey) {
-        this.key = key;
-        this.encodedKey = encodedKey;
-      }
-
-      @Override
-      public boolean equals(Object o) {
-        if (o instanceof GroupingKey) {
-          GroupingKey<?> that = (GroupingKey<?>) o;
-          return Arrays.equals(this.encodedKey, that.encodedKey);
-        } else {
-          return false;
-        }
-      }
-
-      @Override
-      public int hashCode() {
-        return Arrays.hashCode(encodedKey);
-      }
-    }
-  }
-
-  /**
-   * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
-   */
-  public static final class InProcessGroupByKeyOverrideFactory
-      implements PTransformOverrideFactory {
-    @Override
-    public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
-        PTransform<InputT, OutputT> transform) {
-      if (transform instanceof GroupByKey) {
-        @SuppressWarnings({"rawtypes", "unchecked"})
-        PTransform<InputT, OutputT> override = new InProcessGroupByKey((GroupByKey) transform);
-        return override;
-      }
-      return transform;
-    }
-  }
-
-  /**
-   * An in-memory implementation of the {@link GroupByKey} primitive as a composite
-   * {@link PTransform}.
-   */
-  private static final class InProcessGroupByKey<K, V>
-      extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
-    private final GroupByKey<K, V> original;
-
-    private InProcessGroupByKey(GroupByKey<K, V> from) {
-      this.original = from;
-    }
-
-    @Override
-    public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
-      return original;
-    }
-
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
-      // This operation groups by the combination of key and window,
-      // merging windows as needed, using the windows assigned to the
-      // key/value input elements and the window merge operation of the
-      // window function associated with the input PCollection.
-      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-      // Use the default GroupAlsoByWindow implementation
-      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow =
-          groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
-
-      // By default, implement GroupByKey via a series of lower-level operations.
-      return input
-          // Make each input element's timestamp and assigned windows
-          // explicit, in the value part.
-          .apply(new ReifyTimestampsAndWindows<K, V>())
-
-          .apply(new InProcessGroupByKeyOnly<K, V>())
-          .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(),
-              inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()))
-
-          // Group each key's values by window, merging windows as needed.
-          .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow))
-
-          // And update the windowing strategy as appropriate.
-          .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
-          .setCoder(
-              KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
-    }
-
-    private <W extends BoundedWindow>
-        DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(
-            final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) {
-      return GroupAlsoByWindowViaWindowSetDoFn.create(
-          windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
-    }
-  }
-
-  /**
-   * An implementation primitive to use in the evaluation of a {@link GroupByKey}
-   * {@link PTransform}.
-   */
-  public static final class InProcessGroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
-    @Override
-    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
-      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    @VisibleForTesting
-    InProcessGroupByKeyOnly() {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index 9eeafbb..f348d93 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.direct;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
new file mode 100644
index 0000000..5ded8b6
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link GroupByKeyOnly} {@link PTransform}.
+ */
+class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+    TransformEvaluator<InputT> evaluator =
+        createEvaluator(
+            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+    return evaluator;
+  }
+
+  private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
+      AppliedPTransform<
+              PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+              InProcessGroupAlsoByWindow<K, V>>
+          application,
+      CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    return new InProcessGroupAlsoByWindowEvaluator<K, V>(
+        evaluationContext, inputBundle, application);
+  }
+
+  /**
+   * A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored;
+   * all input should be in the global window since all output will be as well.
+   *
+   * @see GroupByKeyViaGroupByKeyOnly
+   */
+  private static class InProcessGroupAlsoByWindowEvaluator<K, V>
+      implements TransformEvaluator<KeyedWorkItem<K, V>> {
+
+    private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;
+
+    public InProcessGroupAlsoByWindowEvaluator(
+        final InProcessEvaluationContext evaluationContext,
+        CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
+        final AppliedPTransform<
+                PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+                InProcessGroupAlsoByWindow<K, V>>
+            application) {
+
+      Coder<V> valueCoder =
+          application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
+
+      @SuppressWarnings("unchecked")
+      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+          (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
+
+      DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
+          GroupAlsoByWindowViaWindowSetDoFn.create(
+              windowingStrategy,
+              SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
+
+      TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};
+
+      // Not technically legit, as the application is not a ParDo
+      this.gabwParDoEvaluator =
+          ParDoInProcessEvaluator.create(
+              evaluationContext,
+              inputBundle,
+              application,
+              gabwDoFn,
+              Collections.<PCollectionView<?>>emptyList(),
+              mainOutputTag,
+              Collections.<TupleTag<?>>emptyList(),
+              ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+    }
+
+    @Override
+    public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
+      gabwParDoEvaluator.processElement(element);
+    }
+
+    @Override
+    public InProcessTransformResult finishBundle() throws Exception {
+      return gabwParDoEvaluator.finishBundle();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
new file mode 100644
index 0000000..026b4d5
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+class InProcessGroupByKey<K, V>
+    extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+  private final GroupByKey<K, V> original;
+
+  InProcessGroupByKey(GroupByKey<K, V> from) {
+    this.original = from;
+  }
+
+  @Override
+  public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
+    return original;
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    @SuppressWarnings("unchecked")
+    KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
+
+    // This operation groups by the combination of key and window,
+    // merging windows as needed, using the windows assigned to the
+    // key/value input elements and the window merge operation of the
+    // window function associated with the input PCollection.
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+    // By default, implement GroupByKey via a series of lower-level operations.
+    return input
+        // Make each input element's timestamp and assigned windows
+        // explicit, in the value part.
+        .apply(new ReifyTimestampsAndWindows<K, V>())
+        .apply(new InProcessGroupByKeyOnly<K, V>())
+        .setCoder(
+            KeyedWorkItemCoder.of(
+                inputCoder.getKeyCoder(),
+                inputCoder.getValueCoder(),
+                input.getWindowingStrategy().getWindowFn().windowCoder()))
+
+        // Group each key's values by window, merging windows as needed.
+        .apply("GroupAlsoByWindow", new InProcessGroupAlsoByWindow<K, V>(windowingStrategy))
+
+        // And update the windowing strategy as appropriate.
+        .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
+        .setCoder(
+            KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
+  }
+
+  static final class InProcessGroupByKeyOnly<K, V>
+      extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
+    @Override
+    public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
+      return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    }
+
+    InProcessGroupByKeyOnly() {}
+  }
+
+  static final class InProcessGroupAlsoByWindow<K, V>
+      extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+    private final WindowingStrategy<?, ?> windowingStrategy;
+
+    public InProcessGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+    }
+
+    public WindowingStrategy<?, ?> getWindowingStrategy() {
+      return windowingStrategy;
+    }
+
+    private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+      // Coder<KV<...>> --> KvCoder<...>
+      checkArgument(
+          inputCoder instanceof KeyedWorkItemCoder,
+          "%s requires a %s<...> but got %s",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName(),
+          inputCoder);
+      @SuppressWarnings("unchecked")
+      KeyedWorkItemCoder<K, V> kvCoder = (KeyedWorkItemCoder<K, V>) inputCoder;
+      return kvCoder;
+    }
+
+    public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+      return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
+    }
+
+    public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+      return getKeyedWorkItemCoder(inputCoder).getElementCoder();
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
+      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
new file mode 100644
index 0000000..79db5b6
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.StepTransformResult.Builder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link GroupByKeyOnly} {@link PTransform}.
+ */
+class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+    TransformEvaluator<InputT> evaluator =
+        createEvaluator(
+            (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+    return evaluator;
+  }
+
+  private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
+      final AppliedPTransform<
+              PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
+              InProcessGroupByKeyOnly<K, V>>
+          application,
+      final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
+      final InProcessEvaluationContext evaluationContext) {
+    return new InProcessGroupByKeyOnlyEvaluator<K, V>(evaluationContext, inputBundle, application);
+  }
+
+  /**
+   * A transform evaluator for the pseudo-primitive {@link GroupByKeyOnly}. Windowing is ignored;
+   * all input should be in the global window since all output will be as well.
+   *
+   * @see GroupByKeyViaGroupByKeyOnly
+   */
+  private static class InProcessGroupByKeyOnlyEvaluator<K, V>
+      implements TransformEvaluator<KV<K, WindowedValue<V>>> {
+    private final InProcessEvaluationContext evaluationContext;
+
+    private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle;
+    private final AppliedPTransform<
+            PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
+            InProcessGroupByKeyOnly<K, V>>
+        application;
+    private final Coder<K> keyCoder;
+    private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
+
+    public InProcessGroupByKeyOnlyEvaluator(
+        InProcessEvaluationContext evaluationContext,
+        CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
+        AppliedPTransform<
+                PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
+                InProcessGroupByKeyOnly<K, V>>
+            application) {
+      this.evaluationContext = evaluationContext;
+      this.inputBundle = inputBundle;
+      this.application = application;
+      this.keyCoder = getKeyCoder(application.getInput().getCoder());
+      this.groupingMap = new HashMap<>();
+    }
+
+    private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
+      checkState(
+          coder instanceof KvCoder,
+          "%s requires a coder of class %s."
+              + " This is an internal error; this is checked during pipeline construction"
+              + " but became corrupted.",
+          getClass().getSimpleName(),
+          KvCoder.class.getSimpleName());
+      @SuppressWarnings("unchecked")
+      Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
+      return keyCoder;
+    }
+
+    @Override
+    public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
+      KV<K, WindowedValue<V>> kv = element.getValue();
+      K key = kv.getKey();
+      byte[] encodedKey;
+      try {
+        encodedKey = encodeToByteArray(keyCoder, key);
+      } catch (CoderException exn) {
+        // TODO: Put in better element printing:
+        // truncate if too long.
+        throw new IllegalArgumentException(
+            String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
+            exn);
+      }
+      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
+      List<WindowedValue<V>> values = groupingMap.get(groupingKey);
+      if (values == null) {
+        values = new ArrayList<WindowedValue<V>>();
+        groupingMap.put(groupingKey, values);
+      }
+      values.add(kv.getValue());
+    }
+
+    @Override
+    public InProcessTransformResult finishBundle() {
+      Builder resultBuilder = StepTransformResult.withoutHold(application);
+      for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
+          groupingMap.entrySet()) {
+        K key = groupedEntry.getKey().key;
+        KeyedWorkItem<K, V> groupedKv =
+            KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
+        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
+            evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
+        bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
+        resultBuilder.addOutput(bundle);
+      }
+      return resultBuilder.build();
+    }
+
+    private static class GroupingKey<K> {
+      private K key;
+      private byte[] encodedKey;
+
+      public GroupingKey(K key, byte[] encodedKey) {
+        this.key = key;
+        this.encodedKey = encodedKey;
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (o instanceof GroupingKey) {
+          GroupingKey<?> that = (GroupingKey<?>) o;
+          return Arrays.equals(this.encodedKey, that.encodedKey);
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public int hashCode() {
+        return Arrays.hashCode(encodedKey);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
new file mode 100644
index 0000000..1d84bc9
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
+ */
+final class InProcessGroupByKeyOverrideFactory
+    implements PTransformOverrideFactory {
+  @Override
+  public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+      PTransform<InputT, OutputT> transform) {
+    if (transform instanceof GroupByKey) {
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      PTransform<InputT, OutputT> override =
+          (PTransform) new InProcessGroupByKey((GroupByKey) transform);
+      return override;
+    }
+    return transform;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
index 19e9f47..a7f6941 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
@@ -17,8 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
 import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index f449731..81d2520 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -44,12 +46,12 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
             .put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
             .put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
-            .put(
-                GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
-                new GroupByKeyEvaluatorFactory())
             .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
             .put(Window.Bound.class, new WindowEvaluatorFactory())
+            // Runner-specific primitives used in expansion of GroupByKey
+            .put(InProcessGroupByKeyOnly.class, new InProcessGroupByKeyOnlyEvaluatorFactory())
+            .put(InProcessGroupAlsoByWindow.class, new InProcessGroupAlsoByWindowEvaluatorFactory())
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 267266d..92f845c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -67,7 +67,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     PCollection<KV<String, WindowedValue<Integer>>> kvs =
         values.apply(new ReifyTimestampsAndWindows<String, Integer>());
     PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
-        kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
+        kvs.apply(new InProcessGroupByKey.InProcessGroupByKeyOnly<String, Integer>());
 
     CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
         bundleFactory.createRootBundle(kvs).commit(Instant.now());
@@ -89,7 +89,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     Coder<String> keyCoder =
         ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder();
     TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
-        new GroupByKeyEvaluatorFactory()
+        new InProcessGroupByKeyOnlyEvaluatorFactory()
             .forApplication(
                 groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
new file mode 100644
index 0000000..1172a4d
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multiset;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link InProcessGroupByKeyOnlyEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessGroupByKeyOnlyEvaluatorFactoryTest {
+  private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+  @Test
+  public void testInMemoryEvaluator() throws Exception {
+    TestPipeline p = TestPipeline.create();
+    KV<String, Integer> firstFoo = KV.of("foo", -1);
+    KV<String, Integer> secondFoo = KV.of("foo", 1);
+    KV<String, Integer> thirdFoo = KV.of("foo", 3);
+    KV<String, Integer> firstBar = KV.of("bar", 22);
+    KV<String, Integer> secondBar = KV.of("bar", 12);
+    KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
+    PCollection<KV<String, Integer>> values =
+        p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
+    PCollection<KV<String, WindowedValue<Integer>>> kvs =
+        values.apply(new ReifyTimestampsAndWindows<String, Integer>());
+    PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
+        kvs.apply(new InProcessGroupByKey.InProcessGroupByKeyOnly<String, Integer>());
+
+    CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
+        bundleFactory.createRootBundle(kvs).commit(Instant.now());
+    InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+
+    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
+        bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
+    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
+        bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
+    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
+        bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
+
+    when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
+    when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);
+    when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle);
+
+    // The input to a GroupByKey is assumed to be a KvCoder
+    @SuppressWarnings("unchecked")
+    Coder<String> keyCoder =
+        ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder();
+    TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
+        new InProcessGroupByKeyOnlyEvaluatorFactory()
+            .forApplication(
+                groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+    evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
+    evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
+    evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
+    evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
+    evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
+    evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
+
+    evaluator.finishBundle();
+
+    assertThat(
+        fooBundle.commit(Instant.now()).getElements(),
+        contains(
+            new KeyedWorkItemMatcher<String, Integer>(
+                KeyedWorkItems.elementsWorkItem(
+                    "foo",
+                    ImmutableSet.of(
+                        WindowedValue.valueInGlobalWindow(-1),
+                        WindowedValue.valueInGlobalWindow(1),
+                        WindowedValue.valueInGlobalWindow(3))),
+                keyCoder)));
+    assertThat(
+        barBundle.commit(Instant.now()).getElements(),
+        contains(
+            new KeyedWorkItemMatcher<String, Integer>(
+                KeyedWorkItems.elementsWorkItem(
+                    "bar",
+                    ImmutableSet.of(
+                        WindowedValue.valueInGlobalWindow(12),
+                        WindowedValue.valueInGlobalWindow(22))),
+                keyCoder)));
+    assertThat(
+        bazBundle.commit(Instant.now()).getElements(),
+        contains(
+            new KeyedWorkItemMatcher<String, Integer>(
+                KeyedWorkItems.elementsWorkItem(
+                    "baz",
+                    ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
+                keyCoder)));
+  }
+
+  private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
+    return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue()));
+  }
+
+  private static class KeyedWorkItemMatcher<K, V>
+      extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
+    private final KeyedWorkItem<K, V> myWorkItem;
+    private final Coder<K> keyCoder;
+
+    public KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> keyCoder) {
+      this.myWorkItem = myWorkItem;
+      this.keyCoder = keyCoder;
+    }
+
+    @Override
+    public boolean matches(Object item) {
+      if (item == null || !(item instanceof WindowedValue)) {
+        return false;
+      }
+      WindowedValue<KeyedWorkItem<K, V>> that = (WindowedValue<KeyedWorkItem<K, V>>) item;
+      Multiset<WindowedValue<V>> myValues = HashMultiset.create();
+      Multiset<WindowedValue<V>> thatValues = HashMultiset.create();
+      for (WindowedValue<V> value : myWorkItem.elementsIterable()) {
+        myValues.add(value);
+      }
+      for (WindowedValue<V> value : that.getValue().elementsIterable()) {
+        thatValues.add(value);
+      }
+      try {
+        return myValues.equals(thatValues)
+            && keyCoder
+                .structuralValue(myWorkItem.key())
+                .equals(keyCoder.structuralValue(that.getValue().key()));
+      } catch (Exception e) {
+        return false;
+      }
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description
+          .appendText("KeyedWorkItem<K, V> containing key ")
+          .appendValue(myWorkItem.key())
+          .appendText(" and values ")
+          .appendValueList("[", ", ", "]", myWorkItem.elementsIterable());
+    }
+  }
+}


[3/3] incubator-beam git commit: This closes #268

Posted by ke...@apache.org.
This closes #268


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f18cd26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f18cd26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f18cd26

Branch: refs/heads/master
Commit: 2f18cd2687a4dad0510bb29cd62c9dd2323ee70b
Parents: f184bcf aad284a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 19 11:53:49 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 19 11:53:49 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/BundleFactory.java      |   2 +-
 .../direct/GroupByKeyEvaluatorFactory.java      | 274 -------------------
 .../direct/InProcessEvaluationContext.java      |   2 +-
 ...rocessGroupAlsoByWindowEvaluatorFactory.java | 127 +++++++++
 .../runners/direct/InProcessGroupByKey.java     | 132 +++++++++
 ...InProcessGroupByKeyOnlyEvaluatorFactory.java | 183 +++++++++++++
 .../InProcessGroupByKeyOverrideFactory.java     |  41 +++
 .../runners/direct/InProcessPipelineRunner.java |   3 +-
 .../direct/TransformEvaluatorRegistry.java      |   8 +-
 .../direct/GroupByKeyEvaluatorFactoryTest.java  |   4 +-
 ...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 183 +++++++++++++
 .../beam/sdk/util/KeyedWorkItemCoder.java       |   8 +
 12 files changed, 684 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f18cd26/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------