You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/23 19:35:13 UTC
[08/10] incubator-beam git commit: [BEAM-144] solve reader
serialization issue
[BEAM-144] solve reader serialization issue
Now, we initialize the UnboundedSourceReader at runtime which requires
us to keep a copy of the PipelineOptions. This should be fine here
because we are at the lowest point of the execution stack.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ac5a1e84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ac5a1e84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ac5a1e84
Branch: refs/heads/master
Commit: ac5a1e848f56916faca130c2a13ad82a960ad635
Parents: e39b5d9
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 23 16:18:04 2016 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Mar 23 19:29:11 2016 +0100
----------------------------------------------------------------------
.../streaming/io/UnboundedSourceWrapper.java | 87 ++++++++++++++------
1 file changed, 62 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ac5a1e84/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index 97084cf..43cf4b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.flink.translation.wrappers.streaming.io;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
@@ -31,26 +32,36 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.joda.time.Instant;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+
/**
- * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded}
- * interface.
+ * A wrapper for Beam's unbounded sources. This class wraps around a source implementing the
+ * {@link com.google.cloud.dataflow.sdk.io.Read.Unbounded} interface.
*
- *</p>
- * For now we support non-parallel, not checkpointed sources.
+ * For now we support non-parallel sources, checkpointing is WIP.
* */
public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<T>> implements Triggerable {
private final String name;
- private final UnboundedSource.UnboundedReader<T> reader;
+ private final UnboundedSource<T, ?> source;
private StreamingRuntimeContext runtime = null;
private StreamSource.ManualWatermarkContext<WindowedValue<T>> context = null;
private volatile boolean isRunning = false;
- public UnboundedSourceWrapper(PipelineOptions options, Read.Unbounded<T> transform) {
+ /** Serialized using custom Java serialization via Jackson */
+ private transient PipelineOptions pipelineOptions;
+
+ /** Instantiated during runtime **/
+ private transient UnboundedSource.UnboundedReader<T> reader;
+
+ public UnboundedSourceWrapper(PipelineOptions pipelineOptions, Read.Unbounded<T> transform) {
this.name = transform.getName();
- this.reader = transform.getSource().createReader(options, null);
+ this.pipelineOptions = pipelineOptions;
+ this.source = transform.getSource();
}
public String getName() {
@@ -67,40 +78,51 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
@Override
public void run(SourceContext<WindowedValue<T>> ctx) throws Exception {
if (!(ctx instanceof StreamSource.ManualWatermarkContext)) {
- throw new RuntimeException("We assume that all sources in Dataflow are EventTimeSourceFunction. " +
- "Apparently " + this.name + " is not. Probably you should consider writing your own Wrapper for this source.");
+ throw new RuntimeException(
+ "We assume that all sources in Dataflow are EventTimeSourceFunction. " +
+ "Apparently " + this.name + " is not. " +
+ "Probably you should consider writing your own Wrapper for this source.");
}
context = (StreamSource.ManualWatermarkContext<WindowedValue<T>>) ctx;
runtime = (StreamingRuntimeContext) getRuntimeContext();
- this.isRunning = true;
+ isRunning = true;
+
+ reader = source.createReader(pipelineOptions, null);
+
boolean inputAvailable = reader.start();
setNextWatermarkTimer(this.runtime);
- while (isRunning) {
-
- while (!inputAvailable && isRunning) {
- // wait a bit until we retry to pull more records
- Thread.sleep(50);
- inputAvailable = reader.advance();
- }
- if (inputAvailable) {
+ try {
- // get it and its timestamp from the source
- T item = reader.getCurrent();
- Instant timestamp = reader.getCurrentTimestamp();
+ while (isRunning) {
- // write it to the output collector
- synchronized (ctx.getCheckpointLock()) {
- context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ if (!inputAvailable && isRunning) {
+ // wait a bit until we retry to pull more records
+ Thread.sleep(50);
+ inputAvailable = reader.advance();
}
- inputAvailable = reader.advance();
+ if (inputAvailable) {
+
+ // get it and its timestamp from the source
+ T item = reader.getCurrent();
+ Instant timestamp = reader.getCurrentTimestamp();
+
+ // write it to the output collector
+ synchronized (ctx.getCheckpointLock()) {
+ context.collectWithTimestamp(makeWindowedValue(item, timestamp), timestamp.getMillis());
+ }
+
+ inputAvailable = reader.advance();
+ }
}
+ } finally {
+ reader.close();
}
}
@@ -131,4 +153,19 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
private long getTimeToNextWaternark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}
+
+
+ // Special serialization of the PipelineOptions necessary to instantiate the reader.
+ private void writeObject(ObjectOutputStream out) throws IOException, ClassNotFoundException {
+ out.defaultWriteObject();
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.writeValue(out, pipelineOptions);
+ }
+
+ // Special deserialization of the PipelineOptions necessary to instantiate the reader.
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ ObjectMapper mapper = new ObjectMapper();
+ pipelineOptions = mapper.readValue(in, PipelineOptions.class);
+ }
}