You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/05 23:00:21 UTC
[1/3] incubator-beam git commit: Move GroupByKey expansion into
DirectPipelineRunner
Repository: incubator-beam
Updated Branches:
refs/heads/master 6c34f3a34 -> c26eef5be
Move GroupByKey expansion into DirectPipelineRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ca5b2def
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ca5b2def
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ca5b2def
Branch: refs/heads/master
Commit: ca5b2def2bf77bd841fc670b167fe7ba37801603
Parents: 706fc53
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Mar 24 15:26:37 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Apr 5 13:54:03 2016 -0700
----------------------------------------------------------------------
.../FlinkBatchTransformTranslators.java | 26 +-
.../beam/runners/spark/SparkPipelineRunner.java | 20 ++
.../spark/translation/TransformTranslator.java | 22 +-
.../sdk/runners/DirectPipelineRunner.java | 116 +++++++
.../inprocess/GroupByKeyEvaluatorFactory.java | 2 +-
.../dataflow/sdk/transforms/GroupByKey.java | 310 +------------------
.../sdk/util/GroupByKeyViaGroupByKeyOnly.java | 245 +++++++++++++++
.../cloud/dataflow/sdk/util/ReduceFnRunner.java | 1 -
.../GroupByKeyEvaluatorFactoryTest.java | 4 +-
9 files changed, 404 insertions(+), 342 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 48c783d..b3c0cea 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -96,7 +96,7 @@ public class FlinkBatchTransformTranslators {
// --------------------------------------------------------------------------------------------
// Transform Translator Registry
// --------------------------------------------------------------------------------------------
-
+
@SuppressWarnings("rawtypes")
private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
@@ -112,7 +112,6 @@ public class FlinkBatchTransformTranslators {
TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
- TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());
// TODO we're currently ignoring windows here but that has to change in the future
TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
@@ -302,25 +301,8 @@ public class FlinkBatchTransformTranslators {
}
}
- private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> {
-
- @Override
- public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
- DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
- GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
-
- TypeInformation<KV<K, Iterable<V>>> typeInformation = context.getTypeInfo(context.getOutput(transform));
-
- Grouping<KV<K, V>> grouping = new UnsortedGrouping<>(inputDataSet, new Keys.ExpressionKeys<>(new String[]{"key"}, inputDataSet.getType()));
-
- GroupReduceOperator<KV<K, V>, KV<K, Iterable<V>>> outputDataSet =
- new GroupReduceOperator<>(grouping, typeInformation, groupReduceFunction, transform.getName());
- context.setOutputDataSet(context.getOutput(transform), outputDataSet);
- }
- }
-
/**
- * Translates a GroupByKey while ignoring window assignments. This is identical to the {@link GroupByKeyOnlyTranslatorBatch}
+ * Translates a GroupByKey while ignoring window assignments. Current ignores windows.
*/
private static class GroupByKeyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey<K, V>> {
@@ -406,7 +388,7 @@ public class FlinkBatchTransformTranslators {
// context.setOutputDataSet(transform.getOutput(), outputDataSet);
// }
// }
-
+
private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
@@ -589,6 +571,6 @@ public class FlinkBatchTransformTranslators {
// --------------------------------------------------------------------------------------------
// Miscellaneous
// --------------------------------------------------------------------------------------------
-
+
private FlinkBatchTransformTranslators() {}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
index d5e4186..71e358c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineRunner.java
@@ -23,7 +23,10 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsValidator;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.runners.TransformTreeNode;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
import com.google.cloud.dataflow.sdk.values.PValue;
@@ -105,6 +108,23 @@ public final class SparkPipelineRunner extends PipelineRunner<EvaluationResult>
}
/**
+ * Overrides for this runner.
+ */
+ @SuppressWarnings("rawtypes")
+ @Override
+ public <OT extends POutput, IT extends PInput> OT apply(
+ PTransform<IT, OT> transform, IT input) {
+
+ if (transform instanceof GroupByKey) {
+ return (OT) ((PCollection) input).apply(
+ new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
+ } else {
+ return super.apply(transform, input);
+ }
+ }
+
+
+ /**
* No parameter constructor defaults to running this pipeline in Spark's local mode, in a single
* thread.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 7f72235..ac59d00 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -18,11 +18,6 @@
package org.apache.beam.runners.spark.translation;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
-
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Arrays;
@@ -30,7 +25,11 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import com.google.api.client.util.Lists;
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
+
import com.google.api.client.util.Maps;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -41,7 +40,6 @@ import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
@@ -50,6 +48,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -82,6 +81,7 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
+
import scala.Tuple2;
/**
@@ -130,10 +130,10 @@ public final class TransformTranslator {
};
}
- private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() {
- return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() {
+ private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbk() {
+ return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
@Override
- public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) {
+ public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD =
(JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform);
@@ -776,7 +776,7 @@ public final class TransformTranslator {
EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
EVALUATORS.put(ParDo.Bound.class, parDo());
EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
- EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk());
+ EVALUATORS.put(GroupByKeyOnly.class, gbk());
EVALUATORS.put(Combine.GroupedValues.class, grouped());
EVALUATORS.put(Combine.Globally.class, combineGlobally());
EVALUATORS.put(Combine.PerKey.class, combinePerKey());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 872cfef..417420a 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -16,6 +16,7 @@
package com.google.cloud.dataflow.sdk.runners;
+import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
@@ -24,6 +25,7 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.ListCoder;
import com.google.cloud.dataflow.sdk.io.AvroIO;
import com.google.cloud.dataflow.sdk.io.FileBasedSink;
@@ -38,12 +40,15 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Partition;
import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner;
@@ -71,6 +76,7 @@ import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -138,6 +144,8 @@ public class DirectPipelineRunner
}
}
+ /////////////////////////////////////////////////////////////////////////////
+
/**
* Records that instances of the specified PTransform class
* should be evaluated by the corresponding TransformEvaluator.
@@ -243,6 +251,9 @@ public class DirectPipelineRunner
return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input);
} else if (transform instanceof AvroIO.Write.Bound) {
return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input);
+ } else if (transform instanceof GroupByKey) {
+ return (OutputT)
+ ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
} else {
return super.apply(transform, input);
}
@@ -1117,6 +1128,39 @@ public class DirectPipelineRunner
/////////////////////////////////////////////////////////////////////////////
+ /**
+ * The key by which GBK groups inputs - elements are grouped by the encoded form of the key,
+ * but the original key may be accessed as well.
+ */
+ private static class GroupingKey<K> {
+ private K key;
+ private byte[] encodedKey;
+
+ public GroupingKey(K key, byte[] encodedKey) {
+ this.key = key;
+ this.encodedKey = encodedKey;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof GroupingKey) {
+ GroupingKey<?> that = (GroupingKey<?>) o;
+ return Arrays.equals(this.encodedKey, that.encodedKey);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(encodedKey);
+ }
+ }
+
private final DirectPipelineOptions options;
private boolean testSerializability;
private boolean testEncodability;
@@ -1153,4 +1197,76 @@ public class DirectPipelineRunner
public String toString() {
return "DirectPipelineRunner#" + hashCode();
}
+
+ public static <K, V> void evaluateGroupByKeyOnly(
+ GroupByKeyOnly<K, V> transform,
+ EvaluationContext context) {
+ PCollection<KV<K, V>> input = context.getInput(transform);
+
+ List<ValueWithMetadata<KV<K, V>>> inputElems =
+ context.getPCollectionValuesWithMetadata(input);
+
+ Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
+
+ Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
+
+ for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
+ K key = elem.getValue().getKey();
+ V value = elem.getValue().getValue();
+ byte[] encodedKey;
+ try {
+ encodedKey = encodeToByteArray(keyCoder, key);
+ } catch (CoderException exn) {
+ // TODO: Put in better element printing:
+ // truncate if too long.
+ throw new IllegalArgumentException(
+ "unable to encode key " + key + " of input to " + transform +
+ " using " + keyCoder,
+ exn);
+ }
+ GroupingKey<K> groupingKey =
+ new GroupingKey<>(key, encodedKey);
+ List<V> values = groupingMap.get(groupingKey);
+ if (values == null) {
+ values = new ArrayList<V>();
+ groupingMap.put(groupingKey, values);
+ }
+ values.add(value);
+ }
+
+ List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
+ new ArrayList<>();
+ for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
+ GroupingKey<K> groupingKey = entry.getKey();
+ K key = groupingKey.getKey();
+ List<V> values = entry.getValue();
+ values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
+ outputElems.add(ValueWithMetadata
+ .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
+ .withKey(key));
+ }
+
+ context.setPCollectionValuesWithMetadata(context.getOutput(transform),
+ outputElems);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public
+ static <K, V> void registerGroupByKeyOnly() {
+ registerDefaultTransformEvaluator(
+ GroupByKeyOnly.class,
+ new TransformEvaluator<GroupByKeyOnly>() {
+ @Override
+ public void evaluate(
+ GroupByKeyOnly transform,
+ EvaluationContext context) {
+ evaluateGroupByKeyOnly(transform, context);
+ }
+ });
+ }
+
+ static {
+ registerGroupByKeyOnly();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 3ec4af1..4f97db0 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -27,11 +27,11 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Build
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
index 8fde3e0..490269b 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
@@ -16,40 +16,20 @@
package com.google.cloud.dataflow.sdk.transforms;
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>},
* groups the values by key and windows, and returns a
@@ -234,34 +214,12 @@ public class GroupByKey<K, V>
@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- // This operation groups by the combination of key and window,
+ // This primitive operation groups by the combination of key and window,
// merging windows as needed, using the windows assigned to the
// key/value input elements and the window merge operation of the
// window function associated with the input PCollection.
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
- // By default, implement GroupByKey[AndWindow] via a series of lower-level
- // operations.
- return input
- // Make each input element's timestamp and assigned windows
- // explicit, in the value part.
- .apply(new ReifyTimestampsAndWindows<K, V>())
-
- // Group by just the key.
- // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
- // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
- // introduced in here.
- .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
- // Sort each key's values by timestamp. GroupAlsoByWindow requires
- // its input to be sorted by timestamp.
- .apply(new SortValuesByTimestamp<K, V>())
-
- // Group each key's values by window, merging windows as needed.
- .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
-
- // And update the windowing strategy as appropriate.
- .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
+ return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded());
}
@Override
@@ -289,7 +247,7 @@ public class GroupByKey<K, V>
* transform, which is also used as the {@code Coder} of the keys of
* the output of this transform.
*/
- static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
+ public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
return getInputKvCoder(inputCoder).getKeyCoder();
}
@@ -311,265 +269,7 @@ public class GroupByKey<K, V>
/**
* Returns the {@code Coder} of the output of this transform.
*/
- static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
+ public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
}
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Helper transform that makes timestamps and window assignments
- * explicit in the value part of each key/value pair.
- */
- public static class ReifyTimestampsAndWindows<K, V>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, WindowedValue<V>>>> {
- @Override
- public PCollection<KV<K, WindowedValue<V>>> apply(
- PCollection<KV<K, V>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
- Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
- inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
- Coder<KV<K, WindowedValue<V>>> outputKvCoder =
- KvCoder.of(keyCoder, outputValueCoder);
- return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
- .setCoder(outputKvCoder);
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Helper transform that sorts the values associated with each key
- * by timestamp.
- */
- public static class SortValuesByTimestamp<K, V>
- extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
- PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
- @Override
- public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- return input.apply(ParDo.of(
- new DoFn<KV<K, Iterable<WindowedValue<V>>>,
- KV<K, Iterable<WindowedValue<V>>>>() {
- @Override
- public void processElement(ProcessContext c) {
- KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
- K key = kvs.getKey();
- Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
- List<WindowedValue<V>> sortedValues = new ArrayList<>();
- for (WindowedValue<V> value : unsortedValues) {
- sortedValues.add(value);
- }
- Collections.sort(sortedValues,
- new Comparator<WindowedValue<V>>() {
- @Override
- public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
- return e1.getTimestamp().compareTo(e2.getTimestamp());
- }
- });
- c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
- }}))
- .setCoder(input.getCoder());
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Helper transform that takes a collection of timestamp-ordered
- * values associated with each key, groups the values by window,
- * combines windows as needed, and for each window in each key,
- * outputs a collection of key/value-list pairs implicitly assigned
- * to the window and with the timestamp derived from that window.
- */
- public static class GroupAlsoByWindow<K, V>
- extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
- PCollection<KV<K, Iterable<V>>>> {
- private final WindowingStrategy<?, ?> windowingStrategy;
-
- public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
- this.windowingStrategy = windowingStrategy;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public PCollection<KV<K, Iterable<V>>> apply(
- PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
- @SuppressWarnings("unchecked")
- KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
- (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
- Coder<K> keyCoder = inputKvCoder.getKeyCoder();
- Coder<Iterable<WindowedValue<V>>> inputValueCoder =
- inputKvCoder.getValueCoder();
-
- IterableCoder<WindowedValue<V>> inputIterableValueCoder =
- (IterableCoder<WindowedValue<V>>) inputValueCoder;
- Coder<WindowedValue<V>> inputIterableElementCoder =
- inputIterableValueCoder.getElemCoder();
- WindowedValueCoder<V> inputIterableWindowedValueCoder =
- (WindowedValueCoder<V>) inputIterableElementCoder;
-
- Coder<V> inputIterableElementValueCoder =
- inputIterableWindowedValueCoder.getValueCoder();
- Coder<Iterable<V>> outputValueCoder =
- IterableCoder.of(inputIterableElementValueCoder);
- Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
- return input
- .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
- .setCoder(outputKvCoder);
- }
-
- private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>
- groupAlsoByWindowsFn(
- WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
- return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
- strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Primitive helper transform that groups by key only, ignoring any
- * window assignments.
- */
- public static class GroupByKeyOnly<K, V>
- extends PTransform<PCollection<KV<K, V>>,
- PCollection<KV<K, Iterable<V>>>> {
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- @Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
- }
-
- /**
- * Returns the {@code Coder} of the input to this transform, which
- * should be a {@code KvCoder}.
- */
- @SuppressWarnings("unchecked")
- KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
- if (!(inputCoder instanceof KvCoder)) {
- throw new IllegalStateException(
- "GroupByKey requires its input to use KvCoder");
- }
- return (KvCoder<K, V>) inputCoder;
- }
-
- @Override
- protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
- return GroupByKey.getOutputKvCoder(input.getCoder());
- }
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- registerWithDirectPipelineRunner();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private static <K, V> void registerWithDirectPipelineRunner() {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- GroupByKeyOnly.class,
- new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>() {
- @Override
- public void evaluate(
- GroupByKeyOnly transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateHelper(transform, context);
- }
- });
- }
-
- private static <K, V> void evaluateHelper(
- GroupByKeyOnly<K, V> transform,
- DirectPipelineRunner.EvaluationContext context) {
- PCollection<KV<K, V>> input = context.getInput(transform);
-
- List<ValueWithMetadata<KV<K, V>>> inputElems =
- context.getPCollectionValuesWithMetadata(input);
-
- Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
- Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
- for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
- K key = elem.getValue().getKey();
- V value = elem.getValue().getValue();
- byte[] encodedKey;
- try {
- encodedKey = encodeToByteArray(keyCoder, key);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- "unable to encode key " + key + " of input to " + transform +
- " using " + keyCoder,
- exn);
- }
- GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
- List<V> values = groupingMap.get(groupingKey);
- if (values == null) {
- values = new ArrayList<V>();
- groupingMap.put(groupingKey, values);
- }
- values.add(value);
- }
-
- List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
- new ArrayList<>();
- for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
- GroupingKey<K> groupingKey = entry.getKey();
- K key = groupingKey.getKey();
- List<V> values = entry.getValue();
- values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
- outputElems.add(ValueWithMetadata
- .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
- .withKey(key));
- }
-
- context.setPCollectionValuesWithMetadata(context.getOutput(transform),
- outputElems);
- }
-
- private static class GroupingKey<K> {
- private K key;
- private byte[] encodedKey;
-
- public GroupingKey(K key, byte[] encodedKey) {
- this.key = key;
- this.encodedKey = encodedKey;
- }
-
- public K getKey() {
- return key;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof GroupingKey) {
- GroupingKey<?> that = (GroupingKey<?>) o;
- return Arrays.equals(this.encodedKey, that.encodedKey);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(encodedKey);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
new file mode 100644
index 0000000..c331931
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java
@@ -0,0 +1,245 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.IterableCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly}
+ * primitive.
+ *
+ * <p>This implementation of {@link GroupByKey} proceeds via the following steps:
+ * <ol>
+ * <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds
+ * the previously-implicit timestamp and window into the elements themselves, so a
+ * window-and-timestamp-unaware transform can operate on them.</li>
+ * <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows
+ * and timestamps. Many window-unaware runners have such a primitive already.</li>
+ * <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables
+ * output by {@link GroupByKeyOnly} are sorted by timestamp.</li>
+ * <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is
+ * implemented as a {@link ParDo} that calls reserved internal methods.</li>
+ * </ol>
+ *
+ * <p>This implementation of {@link GroupByKey} has severe limitations unless its component
+ * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style
+ * execution strategy. Specifically:
+ *
+ * <ul>
+ * <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key.
+ * A streaming-style partition, with multiple elements for the same key, will not yield
+ * correct results.</li>
+ * <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed
+ * for large iterables.</li>
+ * <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only
+ * appropriate for runners which also do not support timers.</li>
+ * </ul>
+ */
+public class GroupByKeyViaGroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ private GroupByKey<K, V> gbkTransform;
+
+ public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) {
+ this.gbkTransform = originalTransform;
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+ return input
+ // Make each input element's timestamp and assigned windows
+ // explicit, in the value part.
+ .apply(new ReifyTimestampsAndWindows<K, V>())
+
+ // Group by just the key.
+ // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+ // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+ // introduced in here.
+ .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+ // Sort each key's values by timestamp. GroupAlsoByWindow requires
+ // its input to be sorted by timestamp.
+ .apply(new SortValuesByTimestamp<K, V>())
+
+ // Group each key's values by window, merging windows as needed.
+ .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+ // And update the windowing strategy as appropriate.
+ .setWindowingStrategyInternal(
+ gbkTransform.updateWindowingStrategy(windowingStrategy));
+ }
+
+ /**
+ * Runner-specific primitive that groups by key only, ignoring any window assignments. A
+ * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate
+ * or evaluate this class.
+ */
+ public static class GroupByKeyOnly<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+ }
+
+ @Override
+ public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+ return GroupByKey.getOutputKvCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Helper transform that makes timestamps and window assignments
+ * explicit in the value part of each key/value pair.
+ */
+ public static class ReifyTimestampsAndWindows<K, V>
+ extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, WindowedValue<V>>>> {
+
+ @Override
+ public PCollection<KV<K, WindowedValue<V>>> apply(PCollection<KV<K, V>> input) {
+
+ // The requirement to use a KvCoder *is* actually a model-level requirement, not specific
+ // to this implementation of GBK. All runners need a way to get the key.
+ checkArgument(input.getCoder() instanceof KvCoder,
+ "%s requires its input to use a %s",
+ GroupByKey.class.getSimpleName(),
+ KvCoder.class.getSimpleName());
+
+ @SuppressWarnings("unchecked")
+ KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+ Coder<WindowedValue<V>> outputValueCoder =
+ FullWindowedValueCoder.of(
+ inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+ Coder<KV<K, WindowedValue<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+ return input
+ .apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
+ .setCoder(outputKvCoder);
+ }
+ }
+
+ /**
+ * Helper transform that sorts the values associated with each key by timestamp.
+ */
+ private static class SortValuesByTimestamp<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+ @Override
+ public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ return input
+ .apply(
+ ParDo.of(
+ new DoFn<KV<K, Iterable<WindowedValue<V>>>, KV<K, Iterable<WindowedValue<V>>>>() {
+ @Override
+ public void processElement(ProcessContext c) {
+ KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+ K key = kvs.getKey();
+ Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+ List<WindowedValue<V>> sortedValues = new ArrayList<>();
+ for (WindowedValue<V> value : unsortedValues) {
+ sortedValues.add(value);
+ }
+ Collections.sort(
+ sortedValues,
+ new Comparator<WindowedValue<V>>() {
+ @Override
+ public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+ return e1.getTimestamp().compareTo(e2.getTimestamp());
+ }
+ });
+ c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+ }
+ }))
+ .setCoder(input.getCoder());
+ }
+ }
+
+ /**
+ * Helper transform that takes a collection of timestamp-ordered
+ * values associated with each key, groups the values by window,
+ * combines windows as needed, and for each window in each key,
+ * outputs a collection of key/value-list pairs implicitly assigned
+ * to the window and with the timestamp derived from that window.
+ */
+ private static class GroupAlsoByWindow<K, V>
+ extends PTransform<
+ PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> {
+ private final WindowingStrategy<?, ?> windowingStrategy;
+
+ public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+ this.windowingStrategy = windowingStrategy;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public PCollection<KV<K, Iterable<V>>> apply(
+ PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+ @SuppressWarnings("unchecked")
+ KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+ (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+ Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+ Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder();
+
+ IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+ (IterableCoder<WindowedValue<V>>) inputValueCoder;
+ Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder();
+ WindowedValueCoder<V> inputIterableWindowedValueCoder =
+ (WindowedValueCoder<V>) inputIterableElementCoder;
+
+ Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder();
+ Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder);
+ Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+ return input
+ .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+ .setCoder(outputKvCoder);
+ }
+
+ private <W extends BoundedWindow>
+ GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn(
+ WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+ return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+ strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 560d8ec..483e8c0 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -18,7 +18,6 @@ package com.google.cloud.dataflow.sdk.util;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ca5b2def/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index 4ced82f..9933ec1 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -26,7 +26,7 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.C
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
@@ -60,7 +60,7 @@ public class GroupByKeyEvaluatorFactoryTest {
PCollection<KV<String, Integer>> values =
p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
PCollection<KV<String, WindowedValue<Integer>>> kvs =
- values.apply(new GroupByKey.ReifyTimestampsAndWindows<String, Integer>());
+ values.apply(new ReifyTimestampsAndWindows<String, Integer>());
PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
[2/3] incubator-beam git commit: Use Guava directly instead of via
Google API Client lib
Posted by ke...@apache.org.
Use Guava directly instead of via Google API Client lib
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4f99635a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4f99635a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4f99635a
Branch: refs/heads/master
Commit: 4f99635a331edddcbb34c22de23ae88783f21ca3
Parents: ca5b2de
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Mar 28 09:14:01 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Apr 5 13:54:28 2016 -0700
----------------------------------------------------------------------
.../beam/runners/spark/translation/TransformTranslator.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4f99635a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index ac59d00..8fe3b24 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -30,7 +30,6 @@ import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutput
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
-import com.google.api.client.util.Maps;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
@@ -57,6 +56,8 @@ import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroJob;
[3/3] incubator-beam git commit: Fix ups of merge of #77
Posted by ke...@apache.org.
Fix ups of merge of #77
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c26eef5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c26eef5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c26eef5b
Branch: refs/heads/master
Commit: c26eef5beb809bee75ae98879b618a6530fc6ded
Parents: 6c34f3a 4f99635
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Apr 5 14:00:10 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue Apr 5 14:00:10 2016 -0700
----------------------------------------------------------------------
.../runners/spark/translation/TransformTranslator.java | 10 +++++-----
.../dataflow/sdk/util/GroupByKeyViaGroupByKeyOnly.java | 13 +++++++------
2 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------