You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/09/27 10:00:48 UTC

[beam] branch spark-runner_structured-streaming updated (ab7d24c -> d093ffe)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from ab7d24c  Ignore long time failing test: SparkMetricsSinkTest
     new 5beb435  Apply new Encoders to Window assign translation
     new 6edcfa2  Apply new Encoders to AggregatorCombiner
     new d093ffe  Apply spotless

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/batch/AggregatorCombiner.java      | 22 +++++++++++++++++-----
 .../batch/CombinePerKeyTranslatorBatch.java        | 20 ++++++++++++++++----
 .../batch/WindowAssignTranslatorBatch.java         |  8 ++++++--
 .../metrics/sink/SparkMetricsSinkTest.java         |  2 +-
 4 files changed, 40 insertions(+), 12 deletions(-)


[beam] 03/03: Apply spotless

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit d093ffedcd38f5a00cf2e9dd3aee65b430a15dbd
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Sep 27 11:55:43 2019 +0200

    Apply spotless
---
 .../translation/batch/WindowAssignTranslatorBatch.java                | 4 ++--
 .../aggregators/metrics/sink/SparkMetricsSinkTest.java                | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index 576b914..59cc32a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -46,8 +46,8 @@ class WindowAssignTranslatorBatch<T>
       context.putDataset(output, inputDataset);
     } else {
       WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
-      WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder = WindowedValue.FullWindowedValueCoder
-          .of(input.getCoder(), windowFn.windowCoder());
+      WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder =
+          WindowedValue.FullWindowedValueCoder.of(input.getCoder(), windowFn.windowCoder());
       Dataset<WindowedValue<T>> outputDataset =
           inputDataset.map(
               WindowingHelpers.assignWindowsMapFunction(windowFn),
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
index 9d56f0c..de405a4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/aggregators/metrics/sink/SparkMetricsSinkTest.java
@@ -41,7 +41,7 @@ import org.junit.rules.ExternalResource;
  * <p>A test that verifies Beam metrics are reported to Spark's metrics sink in both batch and
  * streaming modes.
  */
-@Ignore ("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
+@Ignore("Has been failing since at least c350188ef7a8704c7336f3c20a1ab2144abbcd4a")
 public class SparkMetricsSinkTest {
   @Rule public ExternalResource inMemoryMetricsSink = new InMemoryMetricsSinkRule();
   @Rule public final TestPipeline pipeline = TestPipeline.create();


[beam] 02/03: Apply new Encoders to AggregatorCombiner

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6edcfa2dd2e00a43dd3961d87a783fdb195cdf37
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Sep 27 11:55:20 2019 +0200

    Apply new Encoders to AggregatorCombiner
---
 .../translation/batch/AggregatorCombiner.java      | 22 +++++++++++++++++-----
 .../batch/CombinePerKeyTranslatorBatch.java        | 20 ++++++++++++++++----
 2 files changed, 33 insertions(+), 9 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
index 0e3229e..d14569a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/AggregatorCombiner.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -52,13 +54,25 @@ class AggregatorCombiner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
   private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
   private WindowingStrategy<InputT, W> windowingStrategy;
   private TimestampCombiner timestampCombiner;
+  private IterableCoder<WindowedValue<AccumT>> accumulatorCoder;
+  private IterableCoder<WindowedValue<OutputT>> outputCoder;
 
   public AggregatorCombiner(
       Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-      WindowingStrategy<?, ?> windowingStrategy) {
+      WindowingStrategy<?, ?> windowingStrategy,
+      Coder<AccumT> accumulatorCoder,
+      Coder<OutputT> outputCoder) {
     this.combineFn = combineFn;
     this.windowingStrategy = (WindowingStrategy<InputT, W>) windowingStrategy;
     this.timestampCombiner = windowingStrategy.getTimestampCombiner();
+    this.accumulatorCoder =
+        IterableCoder.of(
+            WindowedValue.FullWindowedValueCoder.of(
+                accumulatorCoder, windowingStrategy.getWindowFn().windowCoder()));
+    this.outputCoder =
+        IterableCoder.of(
+            WindowedValue.FullWindowedValueCoder.of(
+                outputCoder, windowingStrategy.getWindowFn().windowCoder()));
   }
 
   @Override
@@ -142,14 +156,12 @@ class AggregatorCombiner<K, InputT, AccumT, OutputT, W extends BoundedWindow>
 
   @Override
   public Encoder<Iterable<WindowedValue<AccumT>>> bufferEncoder() {
-    // TODO replace with accumulatorCoder if possible
-    return EncoderHelpers.genericEncoder();
+    return EncoderHelpers.fromBeamCoder(accumulatorCoder);
   }
 
   @Override
   public Encoder<Iterable<WindowedValue<OutputT>>> outputEncoder() {
-    // TODO replace with outputCoder if possible
-    return EncoderHelpers.genericEncoder();
+    return EncoderHelpers.fromBeamCoder(outputCoder);
   }
 
   private Set<W> collectAccumulatorsWindows(Iterable<WindowedValue<AccumT>> accumulators) {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index 33b037a..be238b5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
@@ -58,20 +59,31 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input);
 
-    Coder<K> keyCoder = (Coder<K>) input.getCoder().getCoderArguments().get(0);
-    Coder<OutputT> outputTCoder = (Coder<OutputT>) output.getCoder().getCoderArguments().get(1);
+    KvCoder<K, InputT> inputCoder = (KvCoder<K, InputT>) input.getCoder();
+    Coder<K> keyCoder = inputCoder.getKeyCoder();
+    KvCoder<K, OutputT> outputKVCoder = (KvCoder<K, OutputT>) output.getCoder();
+    Coder<OutputT> outputCoder = outputKVCoder.getValueCoder();
 
     KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
         inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder));
 
+    Coder<AccumT> accumulatorCoder = null;
+    try {
+      accumulatorCoder =
+          combineFn.getAccumulatorCoder(
+              input.getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
+    } catch (CannotProvideCoderException e) {
+      throw new RuntimeException(e);
+    }
+
     Dataset<Tuple2<K, Iterable<WindowedValue<OutputT>>>> combinedDataset =
         groupedDataset.agg(
             new AggregatorCombiner<K, InputT, AccumT, OutputT, BoundedWindow>(
-                    combineFn, windowingStrategy)
+                    combineFn, windowingStrategy, accumulatorCoder, outputCoder)
                 .toColumn());
 
     // expand the list into separate elements and put the key back into the elements
-    Coder<KV<K, OutputT>> kvCoder = KvCoder.of(keyCoder, outputTCoder);
+    Coder<KV<K, OutputT>> kvCoder = KvCoder.of(keyCoder, outputCoder);
     WindowedValue.WindowedValueCoder<KV<K, OutputT>> wvCoder =
         WindowedValue.FullWindowedValueCoder.of(
             kvCoder, input.getWindowingStrategy().getWindowFn().windowCoder());


[beam] 01/03: Apply new Encoders to Window assign translation

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 5beb435f46ea82ed0380e11e7751bdb6fbbbcee4
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Fri Sep 27 11:22:15 2019 +0200

    Apply new Encoders to Window assign translation
---
 .../translation/batch/WindowAssignTranslatorBatch.java            | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
index fb37f97..576b914 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java
@@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.sql.Dataset;
@@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch<T>
     if (WindowingHelpers.skipAssignWindows(assignTransform, context)) {
       context.putDataset(output, inputDataset);
     } else {
+      WindowFn<T, ?> windowFn = assignTransform.getWindowFn();
+      WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder = WindowedValue.FullWindowedValueCoder
+          .of(input.getCoder(), windowFn.windowCoder());
       Dataset<WindowedValue<T>> outputDataset =
           inputDataset.map(
-              WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()),
-              EncoderHelpers.windowedValueEncoder());
+              WindowingHelpers.assignWindowsMapFunction(windowFn),
+              EncoderHelpers.fromBeamCoder(windoweVdalueCoder));
       context.putDataset(output, outputDataset);
     }
   }