You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2018/12/07 15:10:32 UTC

[beam] branch spark-runner_structured-streaming updated: Add primitive GroupByKeyTranslatorBatch implementation

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push:
     new 7b00f7c  Add primitive GroupByKeyTranslatorBatch implementation
7b00f7c is described below

commit 7b00f7c6a8ef37e42d741b6954a6e9b87ea8fea7
Author: Alexey Romanenko <ar...@gmail.com>
AuthorDate: Fri Dec 7 10:54:12 2018 +0100

    Add primitive GroupByKeyTranslatorBatch implementation
---
 ...KeyTranslatorBatch.java => EncoderHelpers.java} | 22 ++++------
 .../translation/TranslationContext.java            |  4 +-
 .../batch/GroupByKeyTranslatorBatch.java           | 49 ++++++++++++++++++++--
 3 files changed, 56 insertions(+), 19 deletions(-)

diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java
similarity index 56%
copy from runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
copy to runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java
index 4ee77fb..4c56922 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/EncoderHelpers.java
@@ -15,20 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+package org.apache.beam.runners.spark.structuredstreaming.translation;
 
-import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
-import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.sql.Encoder;
+import org.apache.spark.sql.Encoders;
 
-class GroupByKeyTranslatorBatch<K, InputT>
-    implements TransformTranslator<
-        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
+/** {@link Encoders} utility class. */
+public class EncoderHelpers {
 
-  @Override
-  public void translateTransform(
-      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
-      TranslationContext context) {}
+  @SuppressWarnings("unchecked")
+  public static <T> Encoder<T> encoder() {
+    return Encoders.kryo((Class<T>) Object.class);
+  }
 }
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 3c29867..e66bc90 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -46,9 +46,9 @@ import org.apache.spark.sql.streaming.StreamingQueryException;
  */
 public class TranslationContext {
 
-  /** All the datasets of the DAG */
+  /** All the datasets of the DAG. */
   private final Map<PValue, Dataset<?>> datasets;
-  /** datasets that are not used as input to other datasets (leaves of the DAG) */
+  /** datasets that are not used as input to other datasets (leaves of the DAG). */
   private final Set<Dataset<?>> leaves;
 
   private final SparkPipelineOptions options;
diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 4ee77fb..7f2d7fa 100644
--- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -17,18 +17,59 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.List;
+import org.apache.beam.runners.spark.structuredstreaming.translation.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.MapFunction;
+import org.apache.spark.api.java.function.MapGroupsFunction;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.KeyValueGroupedDataset;
 
-class GroupByKeyTranslatorBatch<K, InputT>
+class GroupByKeyTranslatorBatch<K, V>
     implements TransformTranslator<
-        PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
+        PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>>> {
 
   @Override
   public void translateTransform(
-      PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
-      TranslationContext context) {}
+      PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform,
+      TranslationContext context) {
+
+    Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(context.getInput());
+
+    // group by key only.
+    KeyValueGroupedDataset<K, KV<K, V>> grouped =
+        input
+            .map(
+                (MapFunction<WindowedValue<KV<K, V>>, KV<K, V>>) WindowedValue::getValue,
+                EncoderHelpers.encoder())
+            .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.<K>encoder());
+
+    Dataset<KV<K, Iterable<V>>> materialized =
+        grouped.mapGroups(
+            (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>)
+                (key, iterator) -> {
+                  // TODO: can we use here just "Iterable<V> iterable = () -> iterator;" ?
+                  List<V> values = Lists.newArrayList();
+                  while (iterator.hasNext()) {
+                    values.add(iterator.next().getValue());
+                  }
+                  return KV.of(key, Iterables.unmodifiableIterable(values));
+                },
+            EncoderHelpers.encoder());
+
+    Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
+        materialized.map(
+            (MapFunction<KV<K, Iterable<V>>, WindowedValue<KV<K, Iterable<V>>>>)
+                WindowedValue::valueInGlobalWindow,
+            EncoderHelpers.encoder());
+
+    context.putDataset(context.getOutput(), output);
+  }
 }