You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/05/19 19:04:17 UTC
[1/3] incubator-beam git commit: Add accessors for sub-coders of
KeyedWorkItemCoder
Repository: incubator-beam
Updated Branches:
refs/heads/master f184bcf37 -> 2f18cd268
Add accessors for sub-coders of KeyedWorkItemCoder
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca1b3a87
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca1b3a87
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca1b3a87
Branch: refs/heads/master
Commit: ca1b3a87fbe7218c0af912ba0f0deae8b903b1ac
Parents: 93a5d39
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 28 15:51:40 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 13 12:02:02 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca1b3a87/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
index 763f68b..ec5d821 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/KeyedWorkItemCoder.java
@@ -79,6 +79,14 @@ public class KeyedWorkItemCoder<K, ElemT> extends StandardCoder<KeyedWorkItem<K,
this.elemsCoder = IterableCoder.of(FullWindowedValueCoder.of(elemCoder, windowCoder));
}
+ public Coder<K> getKeyCoder() {
+ return keyCoder;
+ }
+
+ public Coder<ElemT> getElementCoder() {
+ return elemCoder;
+ }
+
@Override
public void encode(KeyedWorkItem<K, ElemT> value, OutputStream outStream, Coder.Context context)
throws CoderException, IOException {
[2/3] incubator-beam git commit: Make in-process GroupByKey respect
future Beam model
Posted by ke...@apache.org.
Make in-process GroupByKey respect future Beam model
This introduces or clarifies the following transforms:
- InProcessGroupByKey, which expands like GroupByKeyViaGroupByKeyOnly
but with different intermediate PCollection types.
- InProcessGroupByKeyOnly, which outputs KeyedWorkItem<K, V>. This existed
already under a different name.
- InProcessGroupAlsoByWindow, which is evaluated directly and
accepts input elements of type KeyedWorkItem<K, V>.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aad284a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aad284a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aad284a5
Branch: refs/heads/master
Commit: aad284a513e829552e9ae7fa10ea89cfd89bdb5f
Parents: ca1b3a8
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Apr 28 16:12:21 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri May 13 12:04:31 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/BundleFactory.java | 2 +-
.../direct/GroupByKeyEvaluatorFactory.java | 274 -------------------
.../direct/InProcessEvaluationContext.java | 2 +-
...rocessGroupAlsoByWindowEvaluatorFactory.java | 127 +++++++++
.../runners/direct/InProcessGroupByKey.java | 132 +++++++++
...InProcessGroupByKeyOnlyEvaluatorFactory.java | 183 +++++++++++++
.../InProcessGroupByKeyOverrideFactory.java | 41 +++
.../runners/direct/InProcessPipelineRunner.java | 3 +-
.../direct/TransformEvaluatorRegistry.java | 8 +-
.../direct/GroupByKeyEvaluatorFactoryTest.java | 4 +-
...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 183 +++++++++++++
11 files changed, 676 insertions(+), 283 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index 34529e7..fea4841 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
deleted file mode 100644
index 9a08996..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactory.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
-import org.apache.beam.runners.direct.StepTransformResult.Builder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.IterableCoder;
-import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
-import org.apache.beam.sdk.util.KeyedWorkItem;
-import org.apache.beam.sdk.util.KeyedWorkItemCoder;
-import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.SystemReduceFn;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link GroupByKey}
- * {@link PTransform}.
- */
-class GroupByKeyEvaluatorFactory implements TransformEvaluatorFactory {
- @Override
- public <InputT> TransformEvaluator<InputT> forApplication(
- AppliedPTransform<?, ?, ?> application,
- CommittedBundle<?> inputBundle,
- InProcessEvaluationContext evaluationContext) {
- @SuppressWarnings({"cast", "unchecked", "rawtypes"})
- TransformEvaluator<InputT> evaluator = createEvaluator(
- (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
- return evaluator;
- }
-
- private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
- final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application,
- final CommittedBundle<KV<K, V>> inputBundle,
- final InProcessEvaluationContext evaluationContext) {
- return new GroupByKeyEvaluator<K, V>(evaluationContext, inputBundle, application);
- }
-
- private static class GroupByKeyEvaluator<K, V>
- implements TransformEvaluator<KV<K, WindowedValue<V>>> {
- private final InProcessEvaluationContext evaluationContext;
-
- private final CommittedBundle<KV<K, V>> inputBundle;
- private final AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application;
- private final Coder<K> keyCoder;
- private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
-
- public GroupByKeyEvaluator(
- InProcessEvaluationContext evaluationContext,
- CommittedBundle<KV<K, V>> inputBundle,
- AppliedPTransform<
- PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
- InProcessGroupByKeyOnly<K, V>>
- application) {
- this.evaluationContext = evaluationContext;
- this.inputBundle = inputBundle;
- this.application = application;
-
- PCollection<KV<K, WindowedValue<V>>> input = application.getInput();
- keyCoder = getKeyCoder(input.getCoder());
- groupingMap = new HashMap<>();
- }
-
- private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
- if (!(coder instanceof KvCoder)) {
- throw new IllegalStateException();
- }
- @SuppressWarnings("unchecked")
- Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
- return keyCoder;
- }
-
- @Override
- public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
- KV<K, WindowedValue<V>> kv = element.getValue();
- K key = kv.getKey();
- byte[] encodedKey;
- try {
- encodedKey = encodeToByteArray(keyCoder, key);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
- exn);
- }
- GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
- List<WindowedValue<V>> values = groupingMap.get(groupingKey);
- if (values == null) {
- values = new ArrayList<WindowedValue<V>>();
- groupingMap.put(groupingKey, values);
- }
- values.add(kv.getValue());
- }
-
- @Override
- public InProcessTransformResult finishBundle() {
- Builder resultBuilder = StepTransformResult.withoutHold(application);
- for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
- groupingMap.entrySet()) {
- K key = groupedEntry.getKey().key;
- KeyedWorkItem<K, V> groupedKv =
- KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
- UncommittedBundle<KeyedWorkItem<K, V>> bundle =
- evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
- bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
- resultBuilder.addOutput(bundle);
- }
- return resultBuilder.build();
- }
-
- private static class GroupingKey<K> {
- private K key;
- private byte[] encodedKey;
-
- public GroupingKey(K key, byte[] encodedKey) {
- this.key = key;
- this.encodedKey = encodedKey;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof GroupingKey) {
- GroupingKey<?> that = (GroupingKey<?>) o;
- return Arrays.equals(this.encodedKey, that.encodedKey);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(encodedKey);
- }
- }
- }
-
- /**
- * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
- */
- public static final class InProcessGroupByKeyOverrideFactory
- implements PTransformOverrideFactory {
- @Override
- public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
- PTransform<InputT, OutputT> transform) {
- if (transform instanceof GroupByKey) {
- @SuppressWarnings({"rawtypes", "unchecked"})
- PTransform<InputT, OutputT> override = new InProcessGroupByKey((GroupByKey) transform);
- return override;
- }
- return transform;
- }
- }
-
- /**
- * An in-memory implementation of the {@link GroupByKey} primitive as a composite
- * {@link PTransform}.
- */
- private static final class InProcessGroupByKey<K, V>
- extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
- private final GroupByKey<K, V> original;
-
- private InProcessGroupByKey(GroupByKey<K, V> from) {
- this.original = from;
- }
-
- @Override
- public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
- return original;
- }
-
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
-
- // This operation groups by the combination of key and window,
- // merging windows as needed, using the windows assigned to the
- // key/value input elements and the window merge operation of the
- // window function associated with the input PCollection.
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- // Use the default GroupAlsoByWindow implementation
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow =
- groupAlsoByWindow(windowingStrategy, inputCoder.getValueCoder());
-
- // By default, implement GroupByKey via a series of lower-level operations.
- return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
-
- .apply(new InProcessGroupByKeyOnly<K, V>())
- .setCoder(KeyedWorkItemCoder.of(inputCoder.getKeyCoder(),
- inputCoder.getValueCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()))
-
- // Group each key's values by window, merging windows as needed.
- .apply("GroupAlsoByWindow", ParDo.of(groupAlsoByWindow))
-
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
- .setCoder(
- KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
- }
-
- private <W extends BoundedWindow>
- DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> groupAlsoByWindow(
- final WindowingStrategy<?, W> windowingStrategy, final Coder<V> inputCoder) {
- return GroupAlsoByWindowViaWindowSetDoFn.create(
- windowingStrategy, SystemReduceFn.<K, V, W>buffering(inputCoder));
- }
- }
-
- /**
- * An implementation primitive to use in the evaluation of a {@link GroupByKey}
- * {@link PTransform}.
- */
- public static final class InProcessGroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
- @Override
- public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
- return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- @VisibleForTesting
- InProcessGroupByKeyOnly() {}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index 9eeafbb..f348d93 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -19,9 +19,9 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
new file mode 100644
index 0000000..5ded8b6
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.SystemReduceFn;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Collections;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link GroupByKeyOnly} {@link PTransform}.
+ */
+class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
+ @Override
+ public <InputT> TransformEvaluator<InputT> forApplication(
+ AppliedPTransform<?, ?, ?> application,
+ CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator<InputT> evaluator =
+ createEvaluator(
+ (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+ return evaluator;
+ }
+
+ private <K, V> TransformEvaluator<KeyedWorkItem<K, V>> createEvaluator(
+ AppliedPTransform<
+ PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ InProcessGroupAlsoByWindow<K, V>>
+ application,
+ CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ return new InProcessGroupAlsoByWindowEvaluator<K, V>(
+ evaluationContext, inputBundle, application);
+ }
+
+ /**
+ * A transform evaluator for the pseudo-primitive {@link GroupAlsoByWindow}. Windowing is ignored;
+ * all input should be in the global window since all output will be as well.
+ *
+ * @see GroupByKeyViaGroupByKeyOnly
+ */
+ private static class InProcessGroupAlsoByWindowEvaluator<K, V>
+ implements TransformEvaluator<KeyedWorkItem<K, V>> {
+
+ private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;
+
+ public InProcessGroupAlsoByWindowEvaluator(
+ final InProcessEvaluationContext evaluationContext,
+ CommittedBundle<KeyedWorkItem<K, V>> inputBundle,
+ final AppliedPTransform<
+ PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+ InProcessGroupAlsoByWindow<K, V>>
+ application) {
+
+ Coder<V> valueCoder =
+ application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<?, BoundedWindow> windowingStrategy =
+ (WindowingStrategy<?, BoundedWindow>) application.getTransform().getWindowingStrategy();
+
+ DoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
+ GroupAlsoByWindowViaWindowSetDoFn.create(
+ windowingStrategy,
+ SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
+
+ TupleTag<KV<K, Iterable<V>>> mainOutputTag = new TupleTag<KV<K, Iterable<V>>>() {};
+
+ // Not technically legit, as the application is not a ParDo
+ this.gabwParDoEvaluator =
+ ParDoInProcessEvaluator.create(
+ evaluationContext,
+ inputBundle,
+ application,
+ gabwDoFn,
+ Collections.<PCollectionView<?>>emptyList(),
+ mainOutputTag,
+ Collections.<TupleTag<?>>emptyList(),
+ ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput()));
+ }
+
+ @Override
+ public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception {
+ gabwParDoEvaluator.processElement(element);
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() throws Exception {
+ return gabwParDoEvaluator.finishBundle();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
new file mode 100644
index 0000000..026b4d5
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKey.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItemCoder;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+class InProcessGroupByKey<K, V>
+ extends ForwardingPTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+ private final GroupByKey<K, V> original;
+
+ InProcessGroupByKey(GroupByKey<K, V> from) {
+ this.original = from;
+ }
+
+ @Override
+ public PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> delegate() {
+ return original;
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ @SuppressWarnings("unchecked")
+ KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
+
+ // This operation groups by the combination of key and window,
+ // merging windows as needed, using the windows assigned to the
+ // key/value input elements and the window merge operation of the
+ // window function associated with the input PCollection.
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+ // By default, implement GroupByKey via a series of lower-level operations.
+ return input
+ // Make each input element's timestamp and assigned windows
+ // explicit, in the value part.
+ .apply(new ReifyTimestampsAndWindows<K, V>())
+ .apply(new InProcessGroupByKeyOnly<K, V>())
+ .setCoder(
+ KeyedWorkItemCoder.of(
+ inputCoder.getKeyCoder(),
+ inputCoder.getValueCoder(),
+ input.getWindowingStrategy().getWindowFn().windowCoder()))
+
+ // Group each key's values by window, merging windows as needed.
+ .apply("GroupAlsoByWindow", new InProcessGroupAlsoByWindow<K, V>(windowingStrategy))
+
+ // And update the windowing strategy as appropriate.
+ .setWindowingStrategyInternal(original.updateWindowingStrategy(windowingStrategy))
+ .setCoder(
+ KvCoder.of(inputCoder.getKeyCoder(), IterableCoder.of(inputCoder.getValueCoder())));
+ }
+
+ static final class InProcessGroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>> {
+ @Override
+ public PCollection<KeyedWorkItem<K, V>> apply(PCollection<KV<K, WindowedValue<V>>> input) {
+ return PCollection.<KeyedWorkItem<K, V>>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ }
+
+ InProcessGroupByKeyOnly() {}
+ }
+
+ static final class InProcessGroupAlsoByWindow<K, V>
+ extends PTransform<PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ private final WindowingStrategy<?, ?> windowingStrategy;
+
+ public InProcessGroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ public WindowingStrategy<?, ?> getWindowingStrategy() {
+ return windowingStrategy;
+ }
+
+ private KeyedWorkItemCoder<K, V> getKeyedWorkItemCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+ // Coder<KV<...>> --> KvCoder<...>
+ checkArgument(
+ inputCoder instanceof KeyedWorkItemCoder,
+ "%s requires a %s<...> but got %s",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName(),
+ inputCoder);
+ @SuppressWarnings("unchecked")
+ KeyedWorkItemCoder<K, V> kvCoder = (KeyedWorkItemCoder<K, V>) inputCoder;
+ return kvCoder;
+ }
+
+ public Coder<K> getKeyCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+ return getKeyedWorkItemCoder(inputCoder).getKeyCoder();
+ }
+
+ public Coder<V> getValueCoder(Coder<KeyedWorkItem<K, V>> inputCoder) {
+ return getKeyedWorkItemCoder(inputCoder).getElementCoder();
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KeyedWorkItem<K, V>> input) {
+ return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
new file mode 100644
index 0000000..79db5b6
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.StepTransformResult.Builder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link GroupByKeyOnly} {@link PTransform}.
+ */
+class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
+ @Override
+ public <InputT> TransformEvaluator<InputT> forApplication(
+ AppliedPTransform<?, ?, ?> application,
+ CommittedBundle<?> inputBundle,
+ InProcessEvaluationContext evaluationContext) {
+ @SuppressWarnings({"cast", "unchecked", "rawtypes"})
+ TransformEvaluator<InputT> evaluator =
+ createEvaluator(
+ (AppliedPTransform) application, (CommittedBundle) inputBundle, evaluationContext);
+ return evaluator;
+ }
+
+ private <K, V> TransformEvaluator<KV<K, WindowedValue<V>>> createEvaluator(
+ final AppliedPTransform<
+ PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
+ InProcessGroupByKeyOnly<K, V>>
+ application,
+ final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
+ final InProcessEvaluationContext evaluationContext) {
+ return new InProcessGroupByKeyOnlyEvaluator<K, V>(evaluationContext, inputBundle, application);
+ }
+
+ /**
+ * A transform evaluator for the pseudo-primitive {@link GroupByKeyOnly}. Windowing is ignored;
+ * all input should be in the global window since all output will be as well.
+ *
+ * @see GroupByKeyViaGroupByKeyOnly
+ */
+ private static class InProcessGroupByKeyOnlyEvaluator<K, V>
+ implements TransformEvaluator<KV<K, WindowedValue<V>>> {
+ private final InProcessEvaluationContext evaluationContext;
+
+ private final CommittedBundle<KV<K, WindowedValue<V>>> inputBundle;
+ private final AppliedPTransform<
+ PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
+ InProcessGroupByKeyOnly<K, V>>
+ application;
+ private final Coder<K> keyCoder;
+ private Map<GroupingKey<K>, List<WindowedValue<V>>> groupingMap;
+
+ public InProcessGroupByKeyOnlyEvaluator(
+ InProcessEvaluationContext evaluationContext,
+ CommittedBundle<KV<K, WindowedValue<V>>> inputBundle,
+ AppliedPTransform<
+ PCollection<KV<K, WindowedValue<V>>>, PCollection<KeyedWorkItem<K, V>>,
+ InProcessGroupByKeyOnly<K, V>>
+ application) {
+ this.evaluationContext = evaluationContext;
+ this.inputBundle = inputBundle;
+ this.application = application;
+ this.keyCoder = getKeyCoder(application.getInput().getCoder());
+ this.groupingMap = new HashMap<>();
+ }
+
+ private Coder<K> getKeyCoder(Coder<KV<K, WindowedValue<V>>> coder) {
+ checkState(
+ coder instanceof KvCoder,
+ "%s requires a coder of class %s."
+ + " This is an internal error; this is checked during pipeline construction"
+ + " but became corrupted.",
+ getClass().getSimpleName(),
+ KvCoder.class.getSimpleName());
+ @SuppressWarnings("unchecked")
+ Coder<K> keyCoder = ((KvCoder<K, WindowedValue<V>>) coder).getKeyCoder();
+ return keyCoder;
+ }
+
+ @Override
+ public void processElement(WindowedValue<KV<K, WindowedValue<V>>> element) {
+ KV<K, WindowedValue<V>> kv = element.getValue();
+ K key = kv.getKey();
+ byte[] encodedKey;
+ try {
+ encodedKey = encodeToByteArray(keyCoder, key);
+ } catch (CoderException exn) {
+ // TODO: Put in better element printing:
+ // truncate if too long.
+ throw new IllegalArgumentException(
+ String.format("unable to encode key %s of input to %s using %s", key, this, keyCoder),
+ exn);
+ }
+ GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
+ List<WindowedValue<V>> values = groupingMap.get(groupingKey);
+ if (values == null) {
+ values = new ArrayList<WindowedValue<V>>();
+ groupingMap.put(groupingKey, values);
+ }
+ values.add(kv.getValue());
+ }
+
+ @Override
+ public InProcessTransformResult finishBundle() {
+ Builder resultBuilder = StepTransformResult.withoutHold(application);
+ for (Map.Entry<GroupingKey<K>, List<WindowedValue<V>>> groupedEntry :
+ groupingMap.entrySet()) {
+ K key = groupedEntry.getKey().key;
+ KeyedWorkItem<K, V> groupedKv =
+ KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
+ UncommittedBundle<KeyedWorkItem<K, V>> bundle =
+ evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
+ bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
+ resultBuilder.addOutput(bundle);
+ }
+ return resultBuilder.build();
+ }
+
+ private static class GroupingKey<K> {
+ private K key;
+ private byte[] encodedKey;
+
+ public GroupingKey(K key, byte[] encodedKey) {
+ this.key = key;
+ this.encodedKey = encodedKey;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof GroupingKey) {
+ GroupingKey<?> that = (GroupingKey<?>) o;
+ return Arrays.equals(this.encodedKey, that.encodedKey);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(encodedKey);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
new file mode 100644
index 0000000..1d84bc9
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOverrideFactory.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+/**
+ * A {@link PTransformOverrideFactory} for {@link GroupByKey} PTransforms.
+ */
+final class InProcessGroupByKeyOverrideFactory
+ implements PTransformOverrideFactory {
+ @Override
+ public <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform) {
+ if (transform instanceof GroupByKey) {
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ PTransform<InputT, OutputT> override =
+ (PTransform) new InProcessGroupByKey((GroupByKey) transform);
+ return override;
+ }
+ return transform;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
index 19e9f47..a7f6941 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
@@ -17,8 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index f449731..81d2520 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.direct;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.AppliedPTransform;
@@ -44,12 +46,12 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
.put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory())
.put(ParDo.Bound.class, new ParDoSingleEvaluatorFactory())
.put(ParDo.BoundMulti.class, new ParDoMultiEvaluatorFactory())
- .put(
- GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly.class,
- new GroupByKeyEvaluatorFactory())
.put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
.put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
.put(Window.Bound.class, new WindowEvaluatorFactory())
+ // Runner-specific primitives used in expansion of GroupByKey
+ .put(InProcessGroupByKeyOnly.class, new InProcessGroupByKeyOnlyEvaluatorFactory())
+ .put(InProcessGroupAlsoByWindow.class, new InProcessGroupAlsoByWindowEvaluatorFactory())
.build();
return new TransformEvaluatorRegistry(primitives);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 267266d..92f845c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -67,7 +67,7 @@ public class GroupByKeyEvaluatorFactoryTest {
PCollection<KV<String, WindowedValue<Integer>>> kvs =
values.apply(new ReifyTimestampsAndWindows<String, Integer>());
PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
- kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
+ kvs.apply(new InProcessGroupByKey.InProcessGroupByKeyOnly<String, Integer>());
CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
bundleFactory.createRootBundle(kvs).commit(Instant.now());
@@ -89,7 +89,7 @@ public class GroupByKeyEvaluatorFactoryTest {
Coder<String> keyCoder =
((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder();
TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
- new GroupByKeyEvaluatorFactory()
+ new InProcessGroupByKeyOnlyEvaluatorFactory()
.forApplication(
groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aad284a5/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
new file mode 100644
index 0000000..1172a4d
--- /dev/null
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
+import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.KeyedWorkItems;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multiset;
+
+import org.hamcrest.BaseMatcher;
+import org.hamcrest.Description;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link InProcessGroupByKeyOnlyEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class InProcessGroupByKeyOnlyEvaluatorFactoryTest {
+ private BundleFactory bundleFactory = InProcessBundleFactory.create();
+
+ @Test
+ public void testInMemoryEvaluator() throws Exception {
+ TestPipeline p = TestPipeline.create();
+ KV<String, Integer> firstFoo = KV.of("foo", -1);
+ KV<String, Integer> secondFoo = KV.of("foo", 1);
+ KV<String, Integer> thirdFoo = KV.of("foo", 3);
+ KV<String, Integer> firstBar = KV.of("bar", 22);
+ KV<String, Integer> secondBar = KV.of("bar", 12);
+ KV<String, Integer> firstBaz = KV.of("baz", Integer.MAX_VALUE);
+ PCollection<KV<String, Integer>> values =
+ p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
+ PCollection<KV<String, WindowedValue<Integer>>> kvs =
+ values.apply(new ReifyTimestampsAndWindows<String, Integer>());
+ PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
+ kvs.apply(new InProcessGroupByKey.InProcessGroupByKeyOnly<String, Integer>());
+
+ CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
+ bundleFactory.createRootBundle(kvs).commit(Instant.now());
+ InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
+
+ UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
+ bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
+ UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
+ bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
+ UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
+ bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
+
+ when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
+ when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);
+ when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle);
+
+ // The input to a GroupByKey is assumed to be a KvCoder
+ @SuppressWarnings("unchecked")
+ Coder<String> keyCoder =
+ ((KvCoder<String, WindowedValue<Integer>>) kvs.getCoder()).getKeyCoder();
+ TransformEvaluator<KV<String, WindowedValue<Integer>>> evaluator =
+ new InProcessGroupByKeyOnlyEvaluatorFactory()
+ .forApplication(
+ groupedKvs.getProducingTransformInternal(), inputBundle, evaluationContext);
+
+ evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstFoo)));
+ evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondFoo)));
+ evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(thirdFoo)));
+ evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBar)));
+ evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(secondBar)));
+ evaluator.processElement(WindowedValue.valueInEmptyWindows(gwValue(firstBaz)));
+
+ evaluator.finishBundle();
+
+ assertThat(
+ fooBundle.commit(Instant.now()).getElements(),
+ contains(
+ new KeyedWorkItemMatcher<String, Integer>(
+ KeyedWorkItems.elementsWorkItem(
+ "foo",
+ ImmutableSet.of(
+ WindowedValue.valueInGlobalWindow(-1),
+ WindowedValue.valueInGlobalWindow(1),
+ WindowedValue.valueInGlobalWindow(3))),
+ keyCoder)));
+ assertThat(
+ barBundle.commit(Instant.now()).getElements(),
+ contains(
+ new KeyedWorkItemMatcher<String, Integer>(
+ KeyedWorkItems.elementsWorkItem(
+ "bar",
+ ImmutableSet.of(
+ WindowedValue.valueInGlobalWindow(12),
+ WindowedValue.valueInGlobalWindow(22))),
+ keyCoder)));
+ assertThat(
+ bazBundle.commit(Instant.now()).getElements(),
+ contains(
+ new KeyedWorkItemMatcher<String, Integer>(
+ KeyedWorkItems.elementsWorkItem(
+ "baz",
+ ImmutableSet.of(WindowedValue.valueInGlobalWindow(Integer.MAX_VALUE))),
+ keyCoder)));
+ }
+
+ private <K, V> KV<K, WindowedValue<V>> gwValue(KV<K, V> kv) {
+ return KV.of(kv.getKey(), WindowedValue.valueInGlobalWindow(kv.getValue()));
+ }
+
+ private static class KeyedWorkItemMatcher<K, V>
+ extends BaseMatcher<WindowedValue<KeyedWorkItem<K, V>>> {
+ private final KeyedWorkItem<K, V> myWorkItem;
+ private final Coder<K> keyCoder;
+
+ public KeyedWorkItemMatcher(KeyedWorkItem<K, V> myWorkItem, Coder<K> keyCoder) {
+ this.myWorkItem = myWorkItem;
+ this.keyCoder = keyCoder;
+ }
+
+ @Override
+ public boolean matches(Object item) {
+ if (item == null || !(item instanceof WindowedValue)) {
+ return false;
+ }
+ WindowedValue<KeyedWorkItem<K, V>> that = (WindowedValue<KeyedWorkItem<K, V>>) item;
+ Multiset<WindowedValue<V>> myValues = HashMultiset.create();
+ Multiset<WindowedValue<V>> thatValues = HashMultiset.create();
+ for (WindowedValue<V> value : myWorkItem.elementsIterable()) {
+ myValues.add(value);
+ }
+ for (WindowedValue<V> value : that.getValue().elementsIterable()) {
+ thatValues.add(value);
+ }
+ try {
+ return myValues.equals(thatValues)
+ && keyCoder
+ .structuralValue(myWorkItem.key())
+ .equals(keyCoder.structuralValue(that.getValue().key()));
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description
+ .appendText("KeyedWorkItem<K, V> containing key ")
+ .appendValue(myWorkItem.key())
+ .appendText(" and values ")
+ .appendValueList("[", ", ", "]", myWorkItem.elementsIterable());
+ }
+ }
+}
[3/3] incubator-beam git commit: This closes #268
Posted by ke...@apache.org.
This closes #268
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f18cd26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f18cd26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f18cd26
Branch: refs/heads/master
Commit: 2f18cd2687a4dad0510bb29cd62c9dd2323ee70b
Parents: f184bcf aad284a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 19 11:53:49 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 19 11:53:49 2016 -0700
----------------------------------------------------------------------
.../beam/runners/direct/BundleFactory.java | 2 +-
.../direct/GroupByKeyEvaluatorFactory.java | 274 -------------------
.../direct/InProcessEvaluationContext.java | 2 +-
...rocessGroupAlsoByWindowEvaluatorFactory.java | 127 +++++++++
.../runners/direct/InProcessGroupByKey.java | 132 +++++++++
...InProcessGroupByKeyOnlyEvaluatorFactory.java | 183 +++++++++++++
.../InProcessGroupByKeyOverrideFactory.java | 41 +++
.../runners/direct/InProcessPipelineRunner.java | 3 +-
.../direct/TransformEvaluatorRegistry.java | 8 +-
.../direct/GroupByKeyEvaluatorFactoryTest.java | 4 +-
...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 183 +++++++++++++
.../beam/sdk/util/KeyedWorkItemCoder.java | 8 +
12 files changed, 684 insertions(+), 283 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f18cd26/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------