You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/04/18 16:38:13 UTC
[1/4] incubator-beam git commit: [BEAM-158] add support for bounded
sources in streaming
Repository: incubator-beam
Updated Branches:
refs/heads/master 7646384e2 -> 56e28a90f
[BEAM-158] add support for bounded sources in streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/444a0bbb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/444a0bbb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/444a0bbb
Branch: refs/heads/master
Commit: 444a0bbba51a598689b7e6ccf11de5f6f23d5211
Parents: 7c4f2dc
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Mar 31 10:18:01 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 18 16:36:43 2016 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 46 +++++++++++-
.../streaming/io/UnboundedSourceWrapper.java | 4 +-
.../beam/runners/flink/ReadSourceITCase.java | 9 ++-
.../flink/ReadSourceStreamingITCase.java | 74 ++++++++++++++++++++
4 files changed, 124 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index a1e9f47..927c3a2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink.translation;
import org.apache.beam.runners.flink.translation.functions.UnionCoder;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupAlsoByWindowWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkGroupByKeyWrapper;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkParDoBoundMultiWrapper;
@@ -29,6 +30,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Combine;
@@ -62,8 +64,12 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
+import org.apache.flink.streaming.api.functions.TimestampAssigner;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.Collector;
+import org.apache.kafka.common.utils.Time;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,6 +100,7 @@ public class FlinkStreamingTransformTranslators {
// here you can find all the available translators.
static {
TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
+ TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
@@ -125,7 +132,7 @@ public class FlinkStreamingTransformTranslators {
// in the FlatMap function using the Coder.
List<byte[]> serializedElements = Lists.newArrayList();
- Coder<OUT> elementCoder = context.getOutput(transform).getCoder();
+ Coder<OUT> elementCoder = output.getCoder();
for (OUT element: elements) {
ByteArrayOutputStream bao = new ByteArrayOutputStream();
try {
@@ -148,7 +155,7 @@ public class FlinkStreamingTransformTranslators {
DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction)
.returns(outputType);
- context.setOutputDataStream(context.getOutput(transform), outputDataStream);
+ context.setOutputDataStream(output, outputDataStream);
}
}
@@ -186,6 +193,41 @@ public class FlinkStreamingTransformTranslators {
}
}
+ private static class BoundedReadSourceTranslator<T>
+ implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Bounded<T>> {
+
+ @Override
+ public void translateNode(Read.Bounded<T> transform, FlinkStreamingTranslationContext context) {
+
+ BoundedSource<T> boundedSource = transform.getSource();
+ PCollection<T> output = context.getOutput(transform);
+
+ Coder<T> defaultOutputCoder = boundedSource.getDefaultOutputCoder();
+ CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(defaultOutputCoder);
+
+ DataStream<T> source = context.getExecutionEnvironment().createInput(
+ new SourceInputFormat<>(
+ boundedSource,
+ context.getPipelineOptions()),
+ typeInfo);
+
+ DataStream<WindowedValue<T>> windowedStream = source.flatMap(
+ new FlatMapFunction<T, WindowedValue<T>>() {
+ @Override
+ public void flatMap(T value, Collector<WindowedValue<T>> out) throws Exception {
+ out.collect(
+ WindowedValue.of(value,
+ Instant.now(),
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING));
+ }
+ })
+ .assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
+
+ context.setOutputDataStream(output, windowedStream);
+ }
+ }
+
private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index dcc7967..5be34e6 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -147,12 +147,12 @@ public class UnboundedSourceWrapper<T> extends RichSourceFunction<WindowedValue<
private void setNextWatermarkTimer(StreamingRuntimeContext runtime) {
if (this.isRunning) {
long watermarkInterval = runtime.getExecutionConfig().getAutoWatermarkInterval();
- long timeToNextWatermark = getTimeToNextWaternark(watermarkInterval);
+ long timeToNextWatermark = getTimeToNextWatermark(watermarkInterval);
runtime.registerTimer(timeToNextWatermark, this);
}
}
- private long getTimeToNextWaternark(long watermarkInterval) {
+ private long getTimeToNextWatermark(long watermarkInterval) {
return System.currentTimeMillis() + watermarkInterval;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 4f63925..66c959e 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -18,10 +18,7 @@
package org.apache.beam.runners.flink;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
-import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.CountingInput;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
@@ -31,6 +28,9 @@ import com.google.common.base.Joiner;
import org.apache.flink.test.util.JavaProgramTestBase;
+/**
+ * Reads from a bounded source in batch execution.
+ */
public class ReadSourceITCase extends JavaProgramTestBase {
protected String resultPath;
@@ -44,7 +44,6 @@ public class ReadSourceITCase extends JavaProgramTestBase {
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
- System.out.println(resultPath);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/444a0bbb/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
new file mode 100644
index 0000000..fe71802
--- /dev/null
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import com.google.common.base.Joiner;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+/**
+ * Reads from a bounded source in streaming.
+ */
+public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
+
+ protected String resultPath;
+
+ public ReadSourceStreamingITCase(){
+ }
+
+ private static final String[] EXPECTED_RESULT = new String[] {
+ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+ @Override
+ protected void preSubmit() throws Exception {
+ resultPath = getTempDirPath("result");
+ }
+
+ @Override
+ protected void postSubmit() throws Exception {
+ compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath);
+ }
+
+ @Override
+ protected void testProgram() throws Exception {
+ runProgram(resultPath);
+ }
+
+ private static void runProgram(String resultPath) {
+
+ Pipeline p = FlinkTestPipeline.createForStreaming();
+
+ p
+ .apply(CountingInput.upTo(10))
+ .apply(ParDo.of(new DoFn<Long, String>() {
+ @Override
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(c.element().toString());
+ }
+ }))
+ .apply(TextIO.Write.to(resultPath));
+
+ p.run();
+ }
+}
+
+
[3/4] incubator-beam git commit: [flink] improve InputFormat wrapper
and ReadSourceITCase
Posted by mx...@apache.org.
[flink] improve InputFormat wrapper and ReadSourceITCase
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6eac35e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6eac35e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6eac35e8
Branch: refs/heads/master
Commit: 6eac35e81e93c25da4668fc1b0d30f7c942383f0
Parents: 7646384
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 30 16:43:04 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 18 16:36:43 2016 +0200
----------------------------------------------------------------------
.../translation/wrappers/SourceInputFormat.java | 83 +++++++--------
.../beam/runners/flink/ReadSourceITCase.java | 100 ++-----------------
2 files changed, 43 insertions(+), 140 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6eac35e8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
index 26e6297..4b11abc 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java
@@ -23,20 +23,20 @@ import org.apache.beam.sdk.options.PipelineOptions;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
+
/**
* A Flink {@link org.apache.flink.api.common.io.InputFormat} that wraps a
* Dataflow {@link org.apache.beam.sdk.io.Source}.
@@ -45,37 +45,40 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
private final BoundedSource<T> initialSource;
+
private transient PipelineOptions options;
+ private final byte[] serializedOptions;
- private BoundedSource.BoundedReader<T> reader = null;
- private boolean reachedEnd = true;
+ private transient BoundedSource.BoundedReader<T> reader = null;
+ private boolean inputAvailable = true;
public SourceInputFormat(BoundedSource<T> initialSource, PipelineOptions options) {
this.initialSource = initialSource;
this.options = options;
- }
- private void writeObject(ObjectOutputStream out)
- throws IOException, ClassNotFoundException {
- out.defaultWriteObject();
- ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(out, options);
- }
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ new ObjectMapper().writeValue(baos, options);
+ serializedOptions = baos.toByteArray();
+ } catch (Exception e) {
+ throw new RuntimeException("Couldn't serialize PipelineOptions.", e);
+ }
- private void readObject(ObjectInputStream in)
- throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- ObjectMapper mapper = new ObjectMapper();
- options = mapper.readValue(in, PipelineOptions.class);
}
@Override
- public void configure(Configuration configuration) {}
+ public void configure(Configuration configuration) {
+ try {
+ options = new ObjectMapper().readValue(serializedOptions, PipelineOptions.class);
+ } catch (IOException e) {
+ throw new RuntimeException("Couldn't deserialize the PipelineOptions.", e);
+ }
+ }
@Override
public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
reader = ((BoundedSource<T>) sourceInputSplit.getSource()).createReader(options);
- reachedEnd = false;
+ inputAvailable = reader.start();
}
@Override
@@ -87,7 +90,6 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
@Override
public long getTotalInputSize() {
return estimatedSize;
-
}
@Override
@@ -110,17 +112,15 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
@Override
@SuppressWarnings("unchecked")
public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
- long desiredSizeBytes;
try {
- desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
- List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes,
- options);
- List<SourceInputSplit<T>> splits = new ArrayList<>();
- int splitCount = 0;
- for (Source<T> shard: shards) {
- splits.add(new SourceInputSplit<>(shard, splitCount++));
+ long desiredSizeBytes = initialSource.getEstimatedSizeBytes(options) / numSplits;
+ List<? extends Source<T>> shards = initialSource.splitIntoBundles(desiredSizeBytes, options);
+ int numShards = shards.size();
+ SourceInputSplit<T>[] sourceInputSplits = new SourceInputSplit[numShards];
+ for (int i = 0; i < numShards; i++) {
+ sourceInputSplits[i] = new SourceInputSplit<>(shards.get(i), i);
}
- return splits.toArray(new SourceInputSplit[splits.size()]);
+ return sourceInputSplits;
} catch (Exception e) {
throw new IOException("Could not create input splits from Source.", e);
}
@@ -128,33 +128,24 @@ public class SourceInputFormat<T> implements InputFormat<T, SourceInputSplit<T>>
@Override
public InputSplitAssigner getInputSplitAssigner(final SourceInputSplit[] sourceInputSplits) {
- return new InputSplitAssigner() {
- private int index = 0;
- private final SourceInputSplit[] splits = sourceInputSplits;
- @Override
- public InputSplit getNextInputSplit(String host, int taskId) {
- if (index < splits.length) {
- return splits[index++];
- } else {
- return null;
- }
- }
- };
+ return new DefaultInputSplitAssigner(sourceInputSplits);
}
@Override
public boolean reachedEnd() throws IOException {
- return reachedEnd;
+ return !inputAvailable;
}
@Override
public T nextRecord(T t) throws IOException {
-
- reachedEnd = !reader.advance();
- if (!reachedEnd) {
- return reader.getCurrent();
+ if (inputAvailable) {
+ final T current = reader.getCurrent();
+ // advance reader to have a record ready next time
+ inputAvailable = reader.advance();
+ return current;
}
+
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6eac35e8/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index bcad6f1..4f63925 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -23,21 +23,14 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
import org.apache.flink.test.util.JavaProgramTestBase;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-
public class ReadSourceITCase extends JavaProgramTestBase {
protected String resultPath;
@@ -45,12 +38,13 @@ public class ReadSourceITCase extends JavaProgramTestBase {
public ReadSourceITCase(){
}
- static final String[] EXPECTED_RESULT = new String[] {
- "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+ private static final String[] EXPECTED_RESULT = new String[] {
+ "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
@Override
protected void preSubmit() throws Exception {
resultPath = getTempDirPath("result");
+ System.out.println(resultPath);
}
@Override
@@ -68,8 +62,8 @@ public class ReadSourceITCase extends JavaProgramTestBase {
Pipeline p = FlinkTestPipeline.createForBatch();
PCollection<String> result = p
- .apply(Read.from(new ReadSource(1, 10)))
- .apply(ParDo.of(new DoFn<Integer, String>() {
+ .apply(CountingInput.upTo(10))
+ .apply(ParDo.of(new DoFn<Long, String>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element().toString());
@@ -77,90 +71,8 @@ public class ReadSourceITCase extends JavaProgramTestBase {
}));
result.apply(TextIO.Write.to(resultPath));
- p.run();
- }
-
- private static class ReadSource extends BoundedSource<Integer> {
- final int from;
- final int to;
-
- ReadSource(int from, int to) {
- this.from = from;
- this.to = to;
- }
-
- @Override
- public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, PipelineOptions options)
- throws Exception {
- List<ReadSource> res = new ArrayList<>();
- FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);
- int numWorkers = flinkOptions.getParallelism();
- Preconditions.checkArgument(numWorkers > 0, "Number of workers should be larger than 0.");
-
- float step = 1.0f * (to - from) / numWorkers;
- for (int i = 0; i < numWorkers; ++i) {
- res.add(new ReadSource(Math.round(from + i * step), Math.round(from + (i + 1) * step)));
- }
- return res;
- }
-
- @Override
- public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return 8 * (to - from);
- }
-
- @Override
- public boolean producesSortedKeys(PipelineOptions options) throws Exception {
- return true;
- }
-
- @Override
- public BoundedReader<Integer> createReader(PipelineOptions options) throws IOException {
- return new RangeReader(this);
- }
-
- @Override
- public void validate() {}
-
- @Override
- public Coder<Integer> getDefaultOutputCoder() {
- return BigEndianIntegerCoder.of();
- }
-
- private class RangeReader extends BoundedReader<Integer> {
- private int current;
-
- public RangeReader(ReadSource source) {
- this.current = source.from - 1;
- }
-
- @Override
- public boolean start() throws IOException {
- return true;
- }
-
- @Override
- public boolean advance() throws IOException {
- current++;
- return (current < to);
- }
-
- @Override
- public Integer getCurrent() {
- return current;
- }
-
- @Override
- public void close() throws IOException {
- // Nothing
- }
-
- @Override
- public BoundedSource<Integer> getCurrentSource() {
- return ReadSource.this;
- }
- }
+ p.run();
}
}
[4/4] incubator-beam git commit: This closes #104
Posted by mx...@apache.org.
This closes #104
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/56e28a90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/56e28a90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/56e28a90
Branch: refs/heads/master
Commit: 56e28a90fe5b95c6d2c25149db1a3fe8c2205fb4
Parents: 7646384 444a0bb
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Apr 18 16:37:04 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 18 16:37:04 2016 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 63 +++++++++--
.../translation/wrappers/SourceInputFormat.java | 83 +++++++-------
.../streaming/io/UnboundedSourceWrapper.java | 4 +-
.../beam/runners/flink/ReadSourceITCase.java | 107 ++-----------------
.../flink/ReadSourceStreamingITCase.java | 74 +++++++++++++
5 files changed, 178 insertions(+), 153 deletions(-)
----------------------------------------------------------------------
[2/4] incubator-beam git commit: [flink] improvements to
UnboundedSource translation
Posted by mx...@apache.org.
[flink] improvements to UnboundedSource translation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c4f2dc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c4f2dc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c4f2dc1
Branch: refs/heads/master
Commit: 7c4f2dc1e74d7d985fef80cc3cbccb6e390d16aa
Parents: 6eac35e
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 30 19:05:27 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Apr 18 16:36:43 2016 +0200
----------------------------------------------------------------------
.../FlinkStreamingTransformTranslators.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c4f2dc1/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 541cd40..a1e9f47 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -194,19 +194,26 @@ public class FlinkStreamingTransformTranslators {
DataStream<WindowedValue<T>> source;
if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
- UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource();
+ @SuppressWarnings("unchecked")
+ UnboundedFlinkSource<T> flinkSource = (UnboundedFlinkSource<T>) transform.getSource();
source = context.getExecutionEnvironment()
.addSource(flinkSource.getFlinkSource())
- .flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
+ .flatMap(new FlatMapFunction<T, WindowedValue<T>>() {
@Override
- public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
- collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+ public void flatMap(T s, Collector<WindowedValue<T>> collector) throws Exception {
+ collector.collect(
+ WindowedValue.of(
+ s,
+ Instant.now(),
+ GlobalWindow.INSTANCE,
+ PaneInfo.NO_FIRING));
}
- }).assignTimestampsAndWatermarks(new IngestionTimeExtractor());
+ }).assignTimestampsAndWatermarks(new IngestionTimeExtractor<WindowedValue<T>>());
} else {
source = context.getExecutionEnvironment()
.addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
}
+
context.setOutputDataStream(output, source);
}
}