You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2018/03/27 04:50:26 UTC

[beam] branch master updated (302b247 -> f93a332)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git.


    from 302b247  Merge pull request #4937: [BEAM-622] Add checkpointing tests for DoFnOperator and WindowDoFnOperator
     new fef8f54  [BEAM-3087] Make reader state update and element emission atomic
     new 0e44feb  [BEAM-2393] Make BoundedSource fault-tolerant
     new f93a332  Merge pull request #4895: [BEAM-2393] Make BoundedSource fault-tolerant in FlinkRunner Streaming mode

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../UnboundedReadFromBoundedSource.java            |   5 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 138 ++++++++++-
 .../streaming/io/BoundedSourceWrapper.java         | 259 ---------------------
 .../streaming/io/UnboundedSourceWrapper.java       |  67 +++---
 .../flink/streaming/BoundedSourceRestoreTest.java  | 236 +++++++++++++++++++
 5 files changed, 404 insertions(+), 301 deletions(-)
 delete mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
 create mode 100644 runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java

-- 
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.

[beam] 03/03: Merge pull request #4895: [BEAM-2393] Make BoundedSource fault-tolerant in FlinkRunner Streaming mode

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit f93a332a20857f425fa1c1b69d2423f3c2e7880d
Merge: 302b247 0e44feb
Author: Aljoscha Krettek <al...@gmail.com>
AuthorDate: Tue Mar 27 06:49:21 2018 +0200

    Merge pull request #4895: [BEAM-2393] Make BoundedSource fault-tolerant in FlinkRunner Streaming mode

 .../UnboundedReadFromBoundedSource.java            |   5 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 138 ++++++++++-
 .../streaming/io/BoundedSourceWrapper.java         | 259 ---------------------
 .../streaming/io/UnboundedSourceWrapper.java       |  67 +++---
 .../flink/streaming/BoundedSourceRestoreTest.java  | 236 +++++++++++++++++++
 5 files changed, 404 insertions(+), 301 deletions(-)

-- 
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.

[beam] 02/03: [BEAM-2393] Make BoundedSource fault-tolerant

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0e44feb120a73c36116b2bd6b89e4f1676d7266f
Author: Grzegorz Kołakowski <gr...@getindata.com>
AuthorDate: Wed Feb 21 11:11:53 2018 +0100

    [BEAM-2393] Make BoundedSource fault-tolerant
---
 .../UnboundedReadFromBoundedSource.java            |   5 +-
 .../flink/FlinkStreamingTransformTranslators.java  | 138 ++++++++++-
 .../streaming/io/BoundedSourceWrapper.java         | 259 ---------------------
 .../flink/streaming/BoundedSourceRestoreTest.java  | 236 +++++++++++++++++++
 4 files changed, 367 insertions(+), 271 deletions(-)

diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
index 09acc82..88119d1 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnboundedReadFromBoundedSource.java
@@ -169,8 +169,11 @@ public class UnboundedReadFromBoundedSource<T> extends PTransform<PBegin, PColle
       return new CheckpointCoder<>(boundedSource.getDefaultOutputCoder());
     }
 
+    /**
+     * A marker representing the progress and state of an {@link BoundedToUnboundedSourceAdapter}.
+     */
     @VisibleForTesting
-    static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
+    public static class Checkpoint<T> implements UnboundedSource.CheckpointMark {
       private final @Nullable List<TimestampedValue<T>> residualElements;
       private final @Nullable BoundedSource<T> residualSource;
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index 970ece1..74ca168 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.ReadTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
@@ -51,7 +52,6 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.SingletonKey
 import org.apache.beam.runners.flink.translation.wrappers.streaming.SplittableDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WindowDoFnOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.WorkItemKeySelector;
-import org.apache.beam.runners.flink.translation.wrappers.streaming.io.BoundedSourceWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.DedupingOperator;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -87,18 +87,27 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -276,8 +285,7 @@ class FlinkStreamingTransformTranslators {
       PCollection<T> output = context.getOutput(transform);
 
       TypeInformation<WindowedValue<T>> outputTypeInfo =
-          context.getTypeInfo(context.getOutput(transform));
-
+                    context.getTypeInfo(context.getOutput(transform));
 
       BoundedSource<T> rawSource;
       try {
@@ -289,24 +297,26 @@ class FlinkStreamingTransformTranslators {
       }
 
       String fullName = getCurrentTransformName(context);
+      UnboundedSource<T, ?> adaptedRawSource = new BoundedToUnboundedSourceAdapter<>(rawSource);
       DataStream<WindowedValue<T>> source;
       try {
-        BoundedSourceWrapper<T> sourceWrapper =
-            new BoundedSourceWrapper<>(
+        UnboundedSourceWrapperNoValueWithRecordId<T, ?> sourceWrapper =
+            new UnboundedSourceWrapperNoValueWithRecordId<>(
+                new UnboundedSourceWrapper<>(
                 fullName,
                 context.getPipelineOptions(),
-                rawSource,
-                context.getExecutionEnvironment().getParallelism());
+                adaptedRawSource,
+                context.getExecutionEnvironment().getParallelism())
+            );
         source = context
             .getExecutionEnvironment()
             .addSource(sourceWrapper)
-            .name(fullName).uid(fullName)
+            .name(fullName)
+            .uid(fullName)
             .returns(outputTypeInfo);
       } catch (Exception e) {
-        throw new RuntimeException(
-            "Error while translating BoundedSource: " + rawSource, e);
+        throw new RuntimeException("Error while translating BoundedSource: " + rawSource, e);
       }
-
       context.setOutputDataStream(output, source);
     }
   }
@@ -1259,4 +1269,110 @@ class FlinkStreamingTransformTranslators {
       return CreateStreamingFlinkView.CREATE_STREAMING_FLINK_VIEW_URN;
     }
   }
+
+  /**
+   * Wrapper for {@link UnboundedSourceWrapper}, which simplifies output type, namely, removes
+   * {@link ValueWithRecordId}.
+   */
+  private static class UnboundedSourceWrapperNoValueWithRecordId<
+      OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
+      extends RichParallelSourceFunction<WindowedValue<OutputT>>
+      implements ProcessingTimeCallback, StoppableFunction,
+      CheckpointListener, CheckpointedFunction {
+
+    private final UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper;
+
+    private UnboundedSourceWrapperNoValueWithRecordId(
+        UnboundedSourceWrapper<OutputT, CheckpointMarkT> unboundedSourceWrapper) {
+      this.unboundedSourceWrapper = unboundedSourceWrapper;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+      unboundedSourceWrapper.setRuntimeContext(getRuntimeContext());
+      unboundedSourceWrapper.open(parameters);
+    }
+
+    @Override
+    public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
+      unboundedSourceWrapper.run(new SourceContextWrapper(ctx));
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+      unboundedSourceWrapper.initializeState(context);
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+      unboundedSourceWrapper.snapshotState(context);
+    }
+
+    @Override
+    public void notifyCheckpointComplete(long checkpointId) throws Exception {
+      unboundedSourceWrapper.notifyCheckpointComplete(checkpointId);
+    }
+
+    @Override
+    public void stop() {
+      unboundedSourceWrapper.stop();
+    }
+
+    @Override
+    public void cancel() {
+      unboundedSourceWrapper.cancel();
+    }
+
+    @Override
+    public void onProcessingTime(long timestamp) throws Exception {
+      unboundedSourceWrapper.onProcessingTime(timestamp);
+    }
+
+    private final class SourceContextWrapper implements
+        SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> {
+
+      private final SourceContext<WindowedValue<OutputT>> ctx;
+
+      private SourceContextWrapper(SourceContext<WindowedValue<OutputT>> ctx) {
+        this.ctx = ctx;
+      }
+
+      @Override
+      public void collect(WindowedValue<ValueWithRecordId<OutputT>> element) {
+        OutputT originalValue = element.getValue().getValue();
+        WindowedValue<OutputT> output = WindowedValue
+            .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
+        ctx.collect(output);
+      }
+
+      @Override
+      public void collectWithTimestamp(WindowedValue<ValueWithRecordId<OutputT>> element,
+          long timestamp) {
+        OutputT originalValue = element.getValue().getValue();
+        WindowedValue<OutputT> output = WindowedValue
+            .of(originalValue, element.getTimestamp(), element.getWindows(), element.getPane());
+        ctx.collectWithTimestamp(output, timestamp);
+      }
+
+      @Override
+      public void emitWatermark(Watermark mark) {
+        ctx.emitWatermark(mark);
+      }
+
+      @Override
+      public void markAsTemporarilyIdle() {
+        ctx.markAsTemporarilyIdle();
+      }
+
+      @Override
+      public Object getCheckpointLock() {
+        return ctx.getCheckpointLock();
+      }
+
+      @Override
+      public void close() {
+        ctx.close();
+      }
+    }
+  }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
deleted file mode 100644
index 6db5426..0000000
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
-import org.apache.beam.runners.flink.FlinkPipelineOptions;
-import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
-import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.flink.api.common.functions.StoppableFunction;
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source.
- */
-public class BoundedSourceWrapper<OutputT>
-    extends RichParallelSourceFunction<WindowedValue<OutputT>>
-    implements StoppableFunction {
-
-  private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class);
-
-  private String stepName;
-  /**
-   * Keep the options so that we can initialize the readers.
-   */
-  private final SerializablePipelineOptions serializedOptions;
-
-  /**
-   * The split sources. We split them in the constructor to ensure that all parallel
-   * sources are consistent about the split sources.
-   */
-  private List<? extends BoundedSource<OutputT>> splitSources;
-
-  /**
-   * Make it a field so that we can access it in {@link #close()}.
-   */
-  private transient List<BoundedSource.BoundedReader<OutputT>> readers;
-
-  /**
-   * Initialize here and not in run() to prevent races where we cancel a job before run() is
-   * ever called or run() is called after cancel().
-   */
-  private volatile boolean isRunning = true;
-
-  @SuppressWarnings("unchecked")
-  public BoundedSourceWrapper(
-      String stepName,
-      PipelineOptions pipelineOptions,
-      BoundedSource<OutputT> source,
-      int parallelism) throws Exception {
-    this.stepName = stepName;
-    this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
-
-    long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism;
-
-    // get the splits early. we assume that the generated splits are stable,
-    // this is necessary so that the mapping of state to source is correct
-    // when restoring
-    splitSources = source.split(desiredBundleSize, pipelineOptions);
-  }
-
-  @Override
-  public void run(SourceContext<WindowedValue<OutputT>> ctx) throws Exception {
-
-    // figure out which split sources we're responsible for
-    int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();
-    int numSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
-
-    List<BoundedSource<OutputT>> localSources = new ArrayList<>();
-
-    for (int i = 0; i < splitSources.size(); i++) {
-      if (i % numSubtasks == subtaskIndex) {
-        localSources.add(splitSources.get(i));
-      }
-    }
-
-    LOG.info("Bounded Flink Source {}/{} is reading from sources: {}",
-        subtaskIndex,
-        numSubtasks,
-        localSources);
-
-    FlinkMetricContainer metricContainer = new FlinkMetricContainer(getRuntimeContext());
-
-    ReaderInvocationUtil<OutputT, BoundedSource.BoundedReader<OutputT>> readerInvoker =
-        new ReaderInvocationUtil<>(
-            stepName,
-            serializedOptions.get(),
-            metricContainer);
-
-    readers = new ArrayList<>();
-    // initialize readers from scratch
-    for (BoundedSource<OutputT> source : localSources) {
-      readers.add(source.createReader(serializedOptions.get()));
-    }
-
-   if (readers.size() == 1) {
-      // the easy case, we just read from one reader
-      BoundedSource.BoundedReader<OutputT> reader = readers.get(0);
-
-      boolean dataAvailable = readerInvoker.invokeStart(reader);
-      if (dataAvailable) {
-        emitElement(ctx, reader);
-      }
-
-      while (isRunning) {
-        dataAvailable = readerInvoker.invokeAdvance(reader);
-
-        if (dataAvailable)  {
-          emitElement(ctx, reader);
-        } else {
-          break;
-        }
-      }
-    } else {
-      // a bit more complicated, we are responsible for several readers
-      // loop through them and sleep if none of them had any data
-
-      int currentReader = 0;
-
-      // start each reader and emit data if immediately available
-      for (BoundedSource.BoundedReader<OutputT> reader : readers) {
-        boolean dataAvailable = readerInvoker.invokeStart(reader);
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-        }
-      }
-
-      // a flag telling us whether any of the readers had data
-      // if no reader had data, sleep for bit
-      boolean hadData = false;
-      while (isRunning && !readers.isEmpty()) {
-        BoundedSource.BoundedReader<OutputT> reader = readers.get(currentReader);
-        boolean dataAvailable = readerInvoker.invokeAdvance(reader);
-
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-          hadData = true;
-        } else {
-          readers.remove(currentReader);
-          currentReader--;
-          if (readers.isEmpty()) {
-            break;
-          }
-        }
-
-        currentReader = (currentReader + 1) % readers.size();
-        if (currentReader == 0 && !hadData) {
-          Thread.sleep(50);
-        } else if (currentReader == 0) {
-          hadData = false;
-        }
-      }
-
-    }
-
-    // emit final Long.MAX_VALUE watermark, just to be sure
-    ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
-
-    FlinkPipelineOptions options = serializedOptions.get().as(FlinkPipelineOptions.class);
-    if (!options.isShutdownSourcesOnFinalWatermark()) {
-      // do nothing, but still look busy ...
-      // we can't return here since Flink requires that all operators stay up,
-      // otherwise checkpointing would not work correctly anymore
-      //
-      // See https://issues.apache.org/jira/browse/FLINK-2491 for progress on this issue
-
-      // wait until this is canceled
-      final Object waitLock = new Object();
-      while (isRunning) {
-        try {
-          // Flink will interrupt us at some point
-          //noinspection SynchronizationOnLocalVariableOrMethodParameter
-          synchronized (waitLock) {
-            // don't wait indefinitely, in case something goes horribly wrong
-            waitLock.wait(1000);
-          }
-        } catch (InterruptedException e) {
-          if (!isRunning) {
-            // restore the interrupted state, and fall through the loop
-            Thread.currentThread().interrupt();
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Emit the current element from the given Reader. The reader is guaranteed to have data.
-   */
-  private void emitElement(
-      SourceContext<WindowedValue<OutputT>> ctx,
-      BoundedSource.BoundedReader<OutputT> reader) {
-    // make sure that reader state update and element emission are atomic
-    // with respect to snapshots
-    synchronized (ctx.getCheckpointLock()) {
-
-      OutputT item = reader.getCurrent();
-      Instant timestamp = reader.getCurrentTimestamp();
-
-      WindowedValue<OutputT> windowedValue =
-          WindowedValue.of(item, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    super.close();
-    if (readers != null) {
-      for (BoundedSource.BoundedReader<OutputT> reader: readers) {
-        reader.close();
-      }
-    }
-  }
-
-  @Override
-  public void cancel() {
-    isRunning = false;
-  }
-
-  @Override
-  public void stop() {
-    this.isRunning = false;
-  }
-
-  /**
-   * Visible so that we can check this in tests. Must not be used for anything else.
-   */
-  @VisibleForTesting
-  public List<? extends BoundedSource<OutputT>> getSplitSources() {
-    return splitSources;
-  }
-}
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
new file mode 100644
index 0000000..7701602
--- /dev/null
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/BoundedSourceRestoreTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.streaming;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
+import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter.Checkpoint;
+import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.ValueWithRecordId;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
+import org.apache.flink.util.OutputTag;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Test for bounded source restore in streaming mode.
+ */
+@RunWith(Parameterized.class)
+public class BoundedSourceRestoreTest {
+
+  private final int numTasks;
+  private final int numSplits;
+
+  public BoundedSourceRestoreTest(int numTasks, int numSplits) {
+    this.numTasks = numTasks;
+    this.numSplits = numSplits;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    /* Parameters for initializing the tests: {numTasks, numSplits} */
+    return Arrays.asList(new Object[][]{
+        {1, 1},
+        {1, 2},
+        {1, 4},
+    });
+  }
+
+  @Test
+  public void testRestore() throws Exception {
+    final int numElements = 102;
+    final int firstBatchSize = 23;
+    final int secondBatchSize = numElements - firstBatchSize;
+    final Set<Long> emittedElements = new HashSet<>();
+    final Object checkpointLock = new Object();
+    PipelineOptions options = PipelineOptionsFactory.create();
+
+    // bounded source wrapped as unbounded source
+    BoundedSource<Long> source = CountingSource.upTo(numElements);
+    BoundedToUnboundedSourceAdapter<Long> unboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(source);
+    UnboundedSourceWrapper<Long, Checkpoint<Long>> flinkWrapper = new UnboundedSourceWrapper<>(
+        "stepName", options, unboundedSource, numSplits);
+
+    StreamSource<WindowedValue<ValueWithRecordId<Long>>,
+        UnboundedSourceWrapper<Long, Checkpoint<Long>>> sourceOperator =
+        new StreamSource<>(flinkWrapper);
+
+    AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>> testHarness =
+        new AbstractStreamOperatorTestHarness<>(sourceOperator,
+            numTasks /* max parallelism */,
+            numTasks /* parallelism */,
+            0 /* subtask index */);
+    testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // the first half of elements is read
+    boolean readFirstBatchOfElements = false;
+    try {
+      testHarness.open();
+      sourceOperator.run(checkpointLock,
+          new TestStreamStatusMaintainer(),
+          new PartialCollector<>(emittedElements, firstBatchSize)
+      );
+    } catch (SuccessException e) {
+      // success
+      readFirstBatchOfElements = true;
+    }
+    assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
+
+    // draw a snapshot
+    OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+    // finalize checkpoint
+    final ArrayList<Integer> finalizeList = new ArrayList<>();
+    TestCountingSource.setFinalizeTracker(finalizeList);
+    testHarness.notifyOfCompletedCheckpoint(0);
+
+    // create a completely new source but restore from the snapshot
+    BoundedSource<Long> restoredSource = CountingSource.upTo(numElements);
+    BoundedToUnboundedSourceAdapter<Long> restoredUnboundedSource =
+        new BoundedToUnboundedSourceAdapter<>(restoredSource);
+    UnboundedSourceWrapper<Long, Checkpoint<Long>> restoredFlinkWrapper =
+        new UnboundedSourceWrapper<>("stepName", options, restoredUnboundedSource, numSplits);
+    StreamSource<WindowedValue<ValueWithRecordId<Long>>,
+        UnboundedSourceWrapper<Long, Checkpoint<Long>>> restoredSourceOperator =
+        new StreamSource<>(restoredFlinkWrapper);
+
+    // set parallelism to 1 to ensure that our testing operator gets all checkpointed state
+    AbstractStreamOperatorTestHarness<WindowedValue<ValueWithRecordId<Long>>>
+        restoredTestHarness =
+        new AbstractStreamOperatorTestHarness<>(
+            restoredSourceOperator,
+            numTasks /* max parallelism */,
+            1 /* parallelism */,
+            0 /* subtask index */);
+
+    restoredTestHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
+
+    // restore snapshot
+    restoredTestHarness.initializeState(snapshot);
+
+    // run again and verify that we see the other elements
+    boolean readSecondBatchOfElements = false;
+    try {
+      restoredTestHarness.open();
+      restoredSourceOperator.run(checkpointLock,
+          new TestStreamStatusMaintainer(),
+          new PartialCollector<>(emittedElements, secondBatchSize)
+      );
+    } catch (SuccessException e) {
+      // success
+      readSecondBatchOfElements = true;
+    }
+    assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
+
+    // verify that we saw all NUM_ELEMENTS elements
+    assertTrue(emittedElements.size() == numElements);
+  }
+
+  /**
+   * A special {@link RuntimeException} that we throw to signal that the test was successful.
+   */
+  private static class SuccessException extends RuntimeException {
+
+  }
+
+  /**
+   * A collector which consumes only specified number of elements.
+   */
+  private static class PartialCollector<T>
+      implements Output<StreamRecord<WindowedValue<ValueWithRecordId<T>>>> {
+
+    private final Set<T> emittedElements;
+    private final int elementsToConsumeLimit;
+
+    private int count = 0;
+
+    private PartialCollector(Set<T> emittedElements, int elementsToConsumeLimit) {
+      this.emittedElements = emittedElements;
+      this.elementsToConsumeLimit = elementsToConsumeLimit;
+    }
+
+    @Override
+    public void emitWatermark(Watermark watermark) {
+
+    }
+
+    @Override
+    public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> streamRecord) {
+      collect((StreamRecord) streamRecord);
+    }
+
+    @Override
+    public void emitLatencyMarker(LatencyMarker latencyMarker) {
+
+    }
+
+    @Override
+    public void collect(StreamRecord<WindowedValue<ValueWithRecordId<T>>> record) {
+      emittedElements.add(record.getValue().getValue().getValue());
+      count++;
+      if (count >= elementsToConsumeLimit) {
+        throw new SuccessException();
+      }
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
+  private static final class TestStreamStatusMaintainer implements StreamStatusMaintainer {
+
+    StreamStatus currentStreamStatus = StreamStatus.ACTIVE;
+
+    @Override
+    public void toggleStreamStatus(StreamStatus streamStatus) {
+      if (!currentStreamStatus.equals(streamStatus)) {
+        currentStreamStatus = streamStatus;
+      }
+    }
+
+    @Override
+    public StreamStatus getStreamStatus() {
+      return currentStreamStatus;
+    }
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.

[beam] 01/03: [BEAM-3087] Make reader state update and element emission atomic

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git

commit fef8f54be417be88730ebf2d0e8db8c55fbd013f
Author: Grzegorz Kołakowski <gr...@getindata.com>
AuthorDate: Wed Feb 21 10:31:53 2018 +0100

    [BEAM-3087] Make reader state update and element emission atomic
    
    Reader advancement should be considered as reader state update too.
    Therefore, the reader's advancement and element emission are in the
    same synchronized section.
---
 .../streaming/io/UnboundedSourceWrapper.java       | 67 ++++++++++++----------
 1 file changed, 37 insertions(+), 30 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index fc23c01..3f04b6c 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -186,7 +186,7 @@ public class UnboundedSourceWrapper<
 
     if (isRestored) {
       // restore the splitSources from the checkpoint to ensure consistent ordering
-      for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored:
+      for (KV<? extends UnboundedSource<OutputT, CheckpointMarkT>, CheckpointMarkT> restored :
           stateForCheckpoint.get()) {
         localSplitSources.add(restored.getKey());
         localReaders.add(restored.getKey().createReader(
@@ -229,19 +229,25 @@ public class UnboundedSourceWrapper<
       // the easy case, we just read from one reader
       UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(0);
 
-      boolean dataAvailable = readerInvoker.invokeStart(reader);
-      if (dataAvailable) {
-        emitElement(ctx, reader);
+      synchronized (ctx.getCheckpointLock()) {
+        boolean dataAvailable = readerInvoker.invokeStart(reader);
+        if (dataAvailable) {
+          emitElement(ctx, reader);
+        }
       }
 
       setNextWatermarkTimer(this.runtimeContext);
 
       while (isRunning) {
-        dataAvailable = readerInvoker.invokeAdvance(reader);
+        boolean dataAvailable;
+        synchronized (ctx.getCheckpointLock()) {
+          dataAvailable = readerInvoker.invokeAdvance(reader);
 
-        if (dataAvailable)  {
-          emitElement(ctx, reader);
-        } else {
+          if (dataAvailable) {
+            emitElement(ctx, reader);
+          }
+        }
+        if (!dataAvailable) {
           Thread.sleep(50);
         }
       }
@@ -254,9 +260,11 @@ public class UnboundedSourceWrapper<
 
       // start each reader and emit data if immediately available
       for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
-        boolean dataAvailable = readerInvoker.invokeStart(reader);
-        if (dataAvailable) {
-          emitElement(ctx, reader);
+        synchronized (ctx.getCheckpointLock()) {
+          boolean dataAvailable = readerInvoker.invokeStart(reader);
+          if (dataAvailable) {
+            emitElement(ctx, reader);
+          }
         }
       }
 
@@ -267,11 +275,13 @@ public class UnboundedSourceWrapper<
       boolean hadData = false;
       while (isRunning) {
         UnboundedSource.UnboundedReader<OutputT> reader = localReaders.get(currentReader);
-        boolean dataAvailable = readerInvoker.invokeAdvance(reader);
 
-        if (dataAvailable) {
-          emitElement(ctx, reader);
-          hadData = true;
+        synchronized (ctx.getCheckpointLock()) {
+          boolean dataAvailable = readerInvoker.invokeAdvance(reader);
+          if (dataAvailable) {
+            emitElement(ctx, reader);
+            hadData = true;
+          }
         }
 
         currentReader = (currentReader + 1) % numReaders;
@@ -321,24 +331,21 @@ public class UnboundedSourceWrapper<
       UnboundedSource.UnboundedReader<OutputT> reader) {
     // make sure that reader state update and element emission are atomic
     // with respect to snapshots
-    synchronized (ctx.getCheckpointLock()) {
-
-      OutputT item = reader.getCurrent();
-      byte[] recordId = reader.getCurrentRecordId();
-      Instant timestamp = reader.getCurrentTimestamp();
-
-      WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
-          WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
-              GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
-      ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
-    }
+    OutputT item = reader.getCurrent();
+    byte[] recordId = reader.getCurrentRecordId();
+    Instant timestamp = reader.getCurrentTimestamp();
+
+    WindowedValue<ValueWithRecordId<OutputT>> windowedValue =
+        WindowedValue.of(new ValueWithRecordId<>(item, recordId), timestamp,
+            GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+    ctx.collectWithTimestamp(windowedValue, timestamp.getMillis());
   }
 
   @Override
   public void close() throws Exception {
     super.close();
     if (localReaders != null) {
-      for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
+      for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
         reader.close();
       }
     }
@@ -394,8 +401,8 @@ public class UnboundedSourceWrapper<
       int diff = pendingCheckpoints.size() - MAX_NUMBER_PENDING_CHECKPOINTS;
       if (diff >= 0) {
         for (Iterator<Long> iterator = pendingCheckpoints.keySet().iterator();
-             diff >= 0;
-             diff--) {
+            diff >= 0;
+            diff--) {
           iterator.next();
           iterator.remove();
         }
@@ -434,7 +441,7 @@ public class UnboundedSourceWrapper<
       synchronized (context.getCheckpointLock()) {
         // find minimum watermark over all localReaders
         long watermarkMillis = Long.MAX_VALUE;
-        for (UnboundedSource.UnboundedReader<OutputT> reader: localReaders) {
+        for (UnboundedSource.UnboundedReader<OutputT> reader : localReaders) {
           Instant watermark = reader.getWatermark();
           if (watermark != null) {
             watermarkMillis = Math.min(watermark.getMillis(), watermarkMillis);

-- 
To stop receiving notification emails like this one, please contact
aljoscha@apache.org.