You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ec...@apache.org on 2019/05/28 07:33:15 UTC

[beam] branch spark-runner_structured-streaming updated (728aa1f -> 09e6207)

This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a change to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 728aa1f  Consider null object case on RowHelpers, fixes empty side inputs tests.
     new 6ae2056  Put back batch/simpleSourceTest.testBoundedSource
     new 982197c  Update windowAssignTest
     new 6ba3d1c  Add comment about checkpoint mark
     new 89df2bf  Re-code GroupByKeyTranslatorBatch to conserve windowing instead of unwindowing/windowing(GlobalWindow): simplify code, use ReduceFnRunner to merge the windows
     new c947455  re-enable reduceFnRunner timers for output
     new 09e6207  Improve visibility of debug messages

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../translation/TranslationContext.java            |  2 +-
 .../batch/GroupByKeyTranslatorBatch.java           | 74 ++++++++++++++--------
 .../GroupAlsoByWindowViaOutputBufferFn.java}       | 21 +++---
 .../streaming/DatasetSourceStreaming.java          |  1 +
 .../translation/batch/SimpleSourceTest.java        | 11 +++-
 .../translation/batch/WindowAssignTest.java        | 19 +++---
 6 files changed, 78 insertions(+), 50 deletions(-)
 copy runners/spark/src/main/java/org/apache/beam/runners/spark/{translation/SparkGroupAlsoByWindowViaOutputBufferFn.java => structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java} (88%)


[beam] 04/06: Re-code GroupByKeyTranslatorBatch to conserve windowing instead of unwindowing/windowing(GlobalWindow): simplify code, use ReduceFnRunner to merge the windows

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 89df2bf87c7dc926ff9c0a4cb38b83fed1a545a1
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Mon May 27 15:10:02 2019 +0200

    Re-code GroupByKeyTranslatorBatch to conserve windowing instead of unwindowing/windowing(GlobalWindow): simplify code, use ReduceFnRunner to merge the windows
---
 .../batch/GroupByKeyTranslatorBatch.java           |  74 ++++++---
 .../GroupAlsoByWindowViaOutputBufferFn.java        | 168 +++++++++++++++++++++
 2 files changed, 217 insertions(+), 25 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
index 48cceee..b2b4441 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/GroupByKeyTranslatorBatch.java
@@ -17,16 +17,23 @@
  */
 package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator;
 import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
+import org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions.GroupAlsoByWindowViaOutputBufferFn;
 import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
-import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.api.java.function.MapGroupsFunction;
@@ -42,34 +49,51 @@ class GroupByKeyTranslatorBatch<K, V>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> transform,
       TranslationContext context) {
 
-    Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(context.getInput());
+    @SuppressWarnings("unchecked")
+    final PCollection<KV<K, V>> inputPCollection = (PCollection<KV<K, V>>) context.getInput();
 
-    // Extract key to group by key only.
-    KeyValueGroupedDataset<K, KV<K, V>> grouped =
-        input
-            .map(WindowingHelpers.unwindowMapFunction(), EncoderHelpers.kvEncoder())
-            .groupByKey((MapFunction<KV<K, V>, K>) KV::getKey, EncoderHelpers.genericEncoder());
+    Dataset<WindowedValue<KV<K, V>>> input = context.getDataset(inputPCollection);
 
-    // Materialize grouped values, potential OOM because of creation of new iterable
-    Dataset<KV<K, Iterable<V>>> materialized =
-        grouped.mapGroups(
-            (MapGroupsFunction<K, KV<K, V>, KV<K, Iterable<V>>>)
-                // TODO: We need to improve this part and avoid creating of new List (potential OOM)
-                // (key, iterator) -> KV.of(key, () -> Iterators.transform(iterator, KV::getValue)),
-                (key, iterator) -> {
-                  List<V> values = new ArrayList<>();
-                  while (iterator.hasNext()) {
-                    values.add(iterator.next().getValue());
-                  }
-                  return KV.of(key, Iterables.unmodifiableIterable(values));
-                },
-            EncoderHelpers.kvEncoder());
+    //group by key only
+    KeyValueGroupedDataset<K, WindowedValue<KV<K, V>>> groupByKeyOnly = input
+        .groupByKey((MapFunction<WindowedValue<KV<K, V>>, K>) wv -> wv.getValue().getKey(),
+            EncoderHelpers.genericEncoder());
 
-    // Window the result into global window.
-    Dataset<WindowedValue<KV<K, Iterable<V>>>> output =
-        materialized.map(
-            WindowingHelpers.windowMapFunction(), EncoderHelpers.windowedValueEncoder());
+    // Materialize groupByKeyOnly values, potential OOM because of creation of new iterable
+    Dataset<KV<K, Iterable<WindowedValue<V>>>> materialized = groupByKeyOnly.mapGroups(
+        (MapGroupsFunction<K, WindowedValue<KV<K, V>>, KV<K, Iterable<WindowedValue<V>>>>) (key, iterator) -> {
+          List<WindowedValue<V>> values = new ArrayList<>();
+          while (iterator.hasNext()) {
+            WindowedValue<KV<K, V>> next = iterator.next();
+            values.add(WindowedValue
+                .of(next.getValue().getValue(), next.getTimestamp(), next.getWindows(),
+                    next.getPane()));
+          }
+          KV<K, Iterable<WindowedValue<V>>> kv = KV.of(key, Iterables.unmodifiableIterable(values));
+          return kv;
+        }, EncoderHelpers.kvEncoder());
+
+    WindowingStrategy<?, ?> windowingStrategy = inputPCollection.getWindowingStrategy();
+    KvCoder<K, V> coder = (KvCoder<K, V>) inputPCollection.getCoder();
+    // group also by windows
+    Dataset<WindowedValue<KV<K, Iterable<V>>>> output = materialized.flatMap(
+        new GroupAlsoByWindowViaOutputBufferFn<>(windowingStrategy,
+            new InMemoryStateInternalsFactory<>(), SystemReduceFn.buffering(coder.getValueCoder()),
+            context.getSerializableOptions()), EncoderHelpers.windowedValueEncoder());
 
     context.putDataset(context.getOutput(), output);
   }
+
+  /**
+   * In-memory state internals factory.
+   *
+   * @param <K> State key type.
+   */
+  static class InMemoryStateInternalsFactory<K> implements StateInternalsFactory<K>, Serializable {
+    @Override
+    public StateInternals stateInternalsForKey(K key) {
+      return InMemoryStateInternals.forKey(key);
+    }
+  }
+
 }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
new file mode 100644
index 0000000..2fb08f5
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch.functions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.runners.core.construction.TriggerTranslation;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.joda.time.Instant;
+
+/** A FlatMap function that groups by windows in batch mode using {@link ReduceFnRunner}. */
+public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow>
+    implements FlatMapFunction<
+        KV<K, Iterable<WindowedValue<InputT>>>,
+        WindowedValue<KV<K, Iterable<InputT>>>> {
+
+  private final WindowingStrategy<?, W> windowingStrategy;
+  private final StateInternalsFactory<K> stateInternalsFactory;
+  private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn;
+  private final SerializablePipelineOptions options;
+
+  public GroupAlsoByWindowViaOutputBufferFn(
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternalsFactory<K> stateInternalsFactory,
+      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn,
+      SerializablePipelineOptions options) {
+    this.windowingStrategy = windowingStrategy;
+    this.stateInternalsFactory = stateInternalsFactory;
+    this.reduceFn = reduceFn;
+    this.options = options;
+  }
+
+  @Override
+  public Iterator<WindowedValue<KV<K, Iterable<InputT>>>> call(
+      KV<K, Iterable<WindowedValue<InputT>>> kv) throws Exception {
+    K key = kv.getKey();
+    Iterable<WindowedValue<InputT>> values = kv.getValue();
+
+    // ------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
+
+    // Used with Batch, we know that all the data is available for this key. We can't use the
+    // timer manager from the context because it doesn't exist. So we create one and emulate the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+    StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
+
+    ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =
+        new ReduceFnRunner<>(
+            key,
+            windowingStrategy,
+            ExecutableTriggerStateMachine.create(
+                TriggerStateMachines.stateMachineForTrigger(
+                    TriggerTranslation.toProto(windowingStrategy.getTrigger()))),
+            stateInternals,
+            timerInternals,
+            outputter,
+            new UnsupportedSideInputReader("GroupAlsoByWindow"),
+            reduceFn,
+            options.get());
+
+    // Process the grouped values.
+    reduceFnRunner.processElements(values);
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // not supported yet
+/*
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    fireEligibleTimers(timerInternals, reduceFnRunner);
+*/
+
+    reduceFnRunner.persist();
+
+    return outputter.getOutputs().iterator();
+  }
+
+/*  private void fireEligibleTimers(
+      InMemoryTimerInternals timerInternals,
+      ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
+      throws Exception {
+    List<TimerInternals.TimerData> timers = new ArrayList<>();
+    while (true) {
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      reduceFnRunner.onTimers(timers);
+      timers.clear();
+    }
+  }*/
+
+  private static class GABWOutputWindowedValue<K, V>
+      implements OutputWindowedValue<KV<K, Iterable<V>>> {
+    private final List<WindowedValue<KV<K, Iterable<V>>>> outputs = new ArrayList<>();
+
+    @Override
+    public void outputWindowedValue(
+        KV<K, Iterable<V>> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputs.add(WindowedValue.of(output, timestamp, windows, pane));
+    }
+
+    @Override
+    public <AdditionalOutputT> void outputWindowedValue(
+        TupleTag<AdditionalOutputT> tag,
+        AdditionalOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      throw new UnsupportedOperationException("GroupAlsoByWindow should not use tagged outputs.");
+    }
+
+    Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
+      return outputs;
+    }
+  }
+}


[beam] 06/06: Improve visibility of debug messages

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 09e62077dab53fd69cd0151eb0f9404c82112ef8
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 28 09:31:59 2019 +0200

    Improve visibility of debug messages
---
 .../spark/structuredstreaming/translation/TranslationContext.java       | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
index 4d17120..411aec3 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java
@@ -201,7 +201,7 @@ public class TranslationContext {
             // code.
             List<WindowedValue> windowedValues = ((Dataset<WindowedValue>) dataset).collectAsList();
             for (WindowedValue windowedValue : windowedValues) {
-              LOG.debug(windowedValue.toString());
+              LOG.debug("**** dataset content {} ****", windowedValue.toString());
             }
           } else {
             // apply a dummy fn just to apply for each action that will trigger the pipeline run in


[beam] 02/06: Update windowAssignTest

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 982197ce985197e4153d79938db247c84d708fc7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 7 09:35:46 2019 +0200

    Update windowAssignTest
---
 .../translation/batch/WindowAssignTest.java           | 19 ++++++++-----------
 1 file changed, 8 insertions(+), 11 deletions(-)

diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
index 61da3ea..3011d88 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTest.java
@@ -27,13 +27,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -50,20 +48,19 @@ public class WindowAssignTest implements Serializable {
     p = Pipeline.create(options);
   }
 
-  @Ignore
   @Test
   public void testWindowAssign() {
-    PCollection<KV<Integer, Integer>> input =
+    PCollection<Integer> input =
         p.apply(
                 Create.timestamped(
-                    TimestampedValue.of(KV.of(1, 1), new Instant(1)),
-                    TimestampedValue.of(KV.of(1, 2), new Instant(2)),
-                    TimestampedValue.of(KV.of(1, 3), new Instant(3)),
-                    TimestampedValue.of(KV.of(1, 4), new Instant(10)),
-                    TimestampedValue.of(KV.of(1, 5), new Instant(11))))
+                    TimestampedValue.of(1, new Instant(1)),
+                    TimestampedValue.of(2, new Instant(2)),
+                    TimestampedValue.of(3, new Instant(3)),
+                    TimestampedValue.of(4, new Instant(10)),
+                    TimestampedValue.of(5, new Instant(11))))
             .apply(Window.into(FixedWindows.of(Duration.millis(10))))
-            .apply(Sum.integersPerKey());
-    PAssert.that(input).containsInAnyOrder(KV.of(1, 6), KV.of(1, 9));
+            .apply(Sum.integersGlobally().withoutDefaults());
+    PAssert.that(input).containsInAnyOrder(6, 9);
     p.run();
   }
 }


[beam] 01/06: Put back batch/simpleSourceTest.testBoundedSource

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6ae20566886df887761bb01bb49392e9d99b79f5
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue Apr 30 15:10:09 2019 +0200

    Put back batch/simpleSourceTest.testBoundedSource
---
 .../translation/batch/SimpleSourceTest.java                   | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
index 8bd5b24..51be8e3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/SimpleSourceTest.java
@@ -31,6 +31,9 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
 import org.junit.BeforeClass;
@@ -85,4 +88,10 @@ public class SimpleSourceTest implements Serializable {
         new DatasetSourceBatch().createReader(new DataSourceOptions(dataSourceOptions));
     SerializationDebugger.testSerialization(objectToTest, TEMPORARY_FOLDER.newFile());
   }
-}
+
+  @Test
+  public void testBoundedSource() {
+    PCollection<Integer> input = p.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+    PAssert.that(input).containsInAnyOrder(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+    p.run();
+  }}


[beam] 03/06: Add comment about checkpoint mark

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 6ba3d1cb0d09ea0aa7f6a5f308fd59ccceae9d47
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Wed May 15 14:24:11 2019 +0200

    Add comment about checkpoint mark
---
 .../translation/streaming/DatasetSourceStreaming.java                    | 1 +
 1 file changed, 1 insertion(+)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
index 234eecf..1cf52ba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/DatasetSourceStreaming.java
@@ -134,6 +134,7 @@ class DatasetSourceStreaming implements DataSourceV2, MicroBatchReadSupport {
       // offsets are ignored see javadoc
       for (DatasetPartitionReader partitionReader : partitionReaders) {
         try {
+          // TODO: is checkpointMark stored in reliable storage ?
           partitionReader.reader.getCheckpointMark().finalizeCheckpoint();
         } catch (IOException e) {
           throw new RuntimeException(


[beam] 05/06: re-enable reduceFnRunner timers for output

Posted by ec...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit c9474558092611116f308d7b824ee0bb5c11ecb7
Author: Etienne Chauchot <ec...@apache.org>
AuthorDate: Tue May 28 09:31:28 2019 +0200

    re-enable reduceFnRunner timers for output
---
 .../batch/functions/GroupAlsoByWindowViaOutputBufferFn.java       | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
index 2fb08f5..cc65716 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/functions/GroupAlsoByWindowViaOutputBufferFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
 import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.runners.core.construction.TriggerTranslation;
@@ -100,21 +101,18 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind
     // Finish any pending windows by advancing the input watermark to infinity.
     timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
-    // not supported yet
-/*
     // Finally, advance the processing time to infinity to fire any timers.
     timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
     timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
 
     fireEligibleTimers(timerInternals, reduceFnRunner);
-*/
 
     reduceFnRunner.persist();
 
     return outputter.getOutputs().iterator();
   }
 
-/*  private void fireEligibleTimers(
+  private void fireEligibleTimers(
       InMemoryTimerInternals timerInternals,
       ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner)
       throws Exception {
@@ -136,7 +134,7 @@ public class GroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWind
       reduceFnRunner.onTimers(timers);
       timers.clear();
     }
-  }*/
+  }
 
   private static class GABWOutputWindowedValue<K, V>
       implements OutputWindowedValue<KV<K, Iterable<V>>> {