You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/23 19:35:13 UTC

[08/10] incubator-beam git commit: [BEAM-144] solve reader serialization issue

[BEAM-144] solve reader serialization issue

Now, we initialize the UnboundedSourceReader at runtime which requires
us to keep a copy of the PipelineOptions. This should be fine here
because we are at the lowest point of the execution stack.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ac5a1e84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ac5a1e84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ac5a1e84

Branch: refs/heads/master
Commit: ac5a1e848f56916faca130c2a13ad82a960ad635
Parents: e39b5d9
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 23 16:18:04 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Mar 23 19:29:11 2016 +0100

----------------------------------------------------------------------
 .../streaming/io/UnboundedSourceWrapper.java    | 87 ++++++++++++++------
 1 file changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac5a1e84/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 97084cf..43cf4b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.cloud.dataflow.sdk.io.Read;
 import com.google.cloud.dataflow.sdk.io.UnboundedSource;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
@@ -31,26 +32,36 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.joda.time.Instant;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
 /**
- * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
- * interface.
+ * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
+ * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}  interface.
  *
- *</p>
- * For now we support non-parallel, not checkpointed sources.
+ * For now we support non-parallel sources, checkpointing is WIP.
  * */
 public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
 
   private final String name;
-  private final UnboundedSource.UnboundedReader<T> reader;
+  private final UnboundedSource<T, ?> source;
 
   private StreamingRuntimeContext runtime = null;
   private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
 
   private volatile boolean isRunning = false;
 
-  public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
+  /** Serialized using custom Java serialization via Jackson */
+  private transient PipelineOptions pipelineOptions;
+
+  /** Instantiated during runtime **/
+  private transient UnboundedSource.UnboundedReader<T> reader;
+
+  public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) {
     this.name = transform.getName();
-    this.reader = transform.getSource().createReader(options, null);
+    this.pipelineOptions = pipelineOptions;
+    this.source = transform.getSource();
   }
 
   public String getName() {
@@ -67,40 +78,51 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
   @Override
   public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
     if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
-      throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
-          "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+      throw new RuntimeException(
+          "We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+              "Apparently " + this.name + " is not. " +
+              "Probably you should consider writing your own Wrapper for this source.");
     }
 
     context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
     runtime = (StreamingRuntimeContext) getRuntimeContext();
 
-    this.isRunning = true;
+    isRunning = true;
+
+    reader = source.createReader(pipelineOptions, null);
+
     boolean inputAvailable = reader.start();
 
     setNextWatermarkTimer(this.runtime);
 
-    while (isRunning) {
-
-      while (!inputAvailable && isRunning) {
-        // wait a bit until we retry to pull more records
-        Thread.sleep(50);
-        inputAvailable = reader.advance();
-      }
 
-      if (inputAvailable) {
+    try {
 
-        // get it and its timestamp from the source
-        T item = reader.getCurrent();
-        Instant timestamp = reader.getCurrentTimestamp();
+      while (isRunning) {
 
-        // write it to the output collector
-        synchronized (ctx.getCheckpointLock()) {
-          context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+        if (!inputAvailable && isRunning) {
+          // wait a bit until we retry to pull more records
+          Thread.sleep(50);
+          inputAvailable = reader.advance();
         }
 
-        inputAvailable = reader.advance();
+        if (inputAvailable) {
+
+          // get it and its timestamp from the source
+          T item = reader.getCurrent();
+          Instant timestamp = reader.getCurrentTimestamp();
+
+          // write it to the output collector
+          synchronized (ctx.getCheckpointLock()) {
+            context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+          }
+
+          inputAvailable = reader.advance();
+        }
       }
 
+    } finally {
+      reader.close();
     }
   }
 
@@ -131,4 +153,19 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
   private long getTimeToNextWaternark(long watermarkInterval) {
     return System.currentTimeMillis() + watermarkInterval;
   }
+
+
+  // Special serialization of the PipelineOptions necessary to instantiate the reader.
+  private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, pipelineOptions);
+  }
+
+  // Special deserialization of the PipelineOptions necessary to instantiate the reader.
+  private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    pipelineOptions = mapper.readValue(in, PipelineOptions.class);
+  }
 }