You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2017/01/13 18:36:17 UTC
[1/4] beam git commit: [BEAM-1255] java.io.NotSerializableException
in flink on UnboundedSource fix javadoc for BoundedSourceWrapper
Repository: beam
Updated Branches:
refs/heads/master f1ea8f951 -> eaf4450f2
[BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource
fix javadoc for BoundedSourceWrapper
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2344e94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2344e94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2344e94
Branch: refs/heads/master
Commit: c2344e944b25d884f25375ea7fce3a9c203cdb9a
Parents: a93e218
Author: Alexey Diomin <di...@gmail.com>
Authored: Thu Jan 12 10:44:43 2017 +0400
Committer: Alexey Diomin <di...@gmail.com>
Committed: Thu Jan 12 15:40:37 2017 +0400
----------------------------------------------------------------------
.../streaming/io/BoundedSourceWrapper.java | 2 +-
.../streaming/io/UnboundedSourceWrapper.java | 2 +-
.../streaming/UnboundedSourceWrapperTest.java | 464 ++++++++++---------
3 files changed, 250 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index df49a49..909cb0e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source.
+ * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source.
*/
public class BoundedSourceWrapper<OutputT>
extends RichParallelSourceFunction<WindowedValue<OutputT>>
http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index af955ba..68746b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -143,7 +143,7 @@ public class UnboundedSourceWrapper<
} else {
Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
- SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointMarkT>>() {
+ (Coder) SerializableCoder.of(new TypeDescriptor<UnboundedSource>() {
});
checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder));
http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 9e8261a..b0be98b 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -46,259 +46,291 @@ import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.InstantiationUtil;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
/**
* Tests for {@link UnboundedSourceWrapper}.
*/
-@RunWith(Parameterized.class)
+@RunWith(Enclosed.class)
public class UnboundedSourceWrapperTest {
- private final int numTasks;
- private final int numSplits;
+ /**
+ * Parameterized tests.
+ */
+ @RunWith(Parameterized.class)
+ public static class UnboundedSourceWrapperTestWithParams {
+ private final int numTasks;
+ private final int numSplits;
+
+ public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
+ this.numTasks = numTasks;
+ this.numSplits = numSplits;
+ }
- public UnboundedSourceWrapperTest(int numTasks, int numSplits) {
- this.numTasks = numTasks;
- this.numSplits = numSplits;
- }
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ /*
+ * Parameters for initializing the tests:
+ * {numTasks, numSplits}
+ * The test currently assumes powers of two for some assertions.
+ */
+ return Arrays.asList(new Object[][]{
+ {1, 1}, {1, 2}, {1, 4},
+ {2, 1}, {2, 2}, {2, 4},
+ {4, 1}, {4, 2}, {4, 4}
+ });
+ }
- @Parameterized.Parameters
- public static Collection<Object[]> data() {
- /*
- * Parameters for initializing the tests:
- * {numTasks, numSplits}
- * The test currently assumes powers of two for some assertions.
+ /**
+ * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
+ * If numSplits > numTasks the source has one source will manage multiple readers.
*/
- return Arrays.asList(new Object[][] {
- {1, 1}, {1, 2}, {1, 4},
- {2, 1}, {2, 2}, {2, 4},
- {4, 1}, {4, 2}, {4, 4}
- });
- }
+ @Test
+ public void testReaders() throws Exception {
+ final int numElements = 20;
+ final Object checkpointLock = new Object();
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+ // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+ // elements later.
+ TestCountingSource source = new TestCountingSource(numElements);
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+ new UnboundedSourceWrapper<>(options, source, numSplits);
+
+ assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+ StreamSource<WindowedValue<
+ KV<Integer, Integer>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+ setupSourceOperator(sourceOperator, numTasks);
+
+ try {
+ sourceOperator.open();
+ sourceOperator.run(checkpointLock,
+ new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ private int count = 0;
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ }
- /**
- * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
- * If numSplits > numTasks the source has one source will manage multiple readers.
- */
- @Test
- public void testReaders() throws Exception {
- final int numElements = 20;
- final Object checkpointLock = new Object();
- PipelineOptions options = PipelineOptionsFactory.create();
-
- // this source will emit exactly NUM_ELEMENTS across all parallel readers,
- // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
- // elements later.
- TestCountingSource source = new TestCountingSource(numElements);
- UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
-
- assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
- StreamSource<WindowedValue<
- KV<Integer, Integer>>,
- UnboundedSourceWrapper<
- KV<Integer, Integer>,
- TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
- setupSourceOperator(sourceOperator, numTasks);
-
- try {
- sourceOperator.open();
- sourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
- private int count = 0;
-
- @Override
- public void emitWatermark(Watermark watermark) {
- }
-
- @Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
- count++;
- if (count >= numElements) {
- throw new SuccessException();
+ @Override
+ public void collect(
+ StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+
+ count++;
+ if (count >= numElements) {
+ throw new SuccessException();
+ }
}
- }
- @Override
- public void close() {
+ @Override
+ public void close() {
- }
- });
- } catch (SuccessException e) {
+ }
+ });
+ } catch (SuccessException e) {
- assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+ assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
- // success
- return;
+ // success
+ return;
+ }
+ fail("Read terminated without producing expected number of outputs");
}
- fail("Read terminated without producing expected number of outputs");
- }
- /**
- * Verify that snapshot/restore work as expected. We bring up a source and cancel
- * after seeing a certain number of elements. Then we snapshot that source,
- * bring up a completely new source that we restore from the snapshot and verify
- * that we see all expected elements in the end.
- */
- @Test
- public void testRestore() throws Exception {
- final int numElements = 20;
- final Object checkpointLock = new Object();
- PipelineOptions options = PipelineOptionsFactory.create();
-
- // this source will emit exactly NUM_ELEMENTS across all parallel readers,
- // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
- // elements later.
- TestCountingSource source = new TestCountingSource(numElements);
- UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
- new UnboundedSourceWrapper<>(options, source, numSplits);
-
- assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
- StreamSource<
- WindowedValue<KV<Integer, Integer>>,
- UnboundedSourceWrapper<
- KV<Integer, Integer>,
- TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
- setupSourceOperator(sourceOperator, numTasks);
-
- final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
-
- boolean readFirstBatchOfElements = false;
-
- try {
- sourceOperator.open();
- sourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
- private int count = 0;
-
- @Override
- public void emitWatermark(Watermark watermark) {
- }
-
- @Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
- emittedElements.add(windowedValueStreamRecord.getValue().getValue());
- count++;
- if (count >= numElements / 2) {
- throw new SuccessException();
+ /**
+ * Verify that snapshot/restore work as expected. We bring up a source and cancel
+ * after seeing a certain number of elements. Then we snapshot that source,
+ * bring up a completely new source that we restore from the snapshot and verify
+ * that we see all expected elements in the end.
+ */
+ @Test
+ public void testRestore() throws Exception {
+ final int numElements = 20;
+ final Object checkpointLock = new Object();
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+ // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+ // elements later.
+ TestCountingSource source = new TestCountingSource(numElements);
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+ new UnboundedSourceWrapper<>(options, source, numSplits);
+
+ assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+ StreamSource<
+ WindowedValue<KV<Integer, Integer>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+ setupSourceOperator(sourceOperator, numTasks);
+
+ final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
+
+ boolean readFirstBatchOfElements = false;
+
+ try {
+ sourceOperator.open();
+ sourceOperator.run(checkpointLock,
+ new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ private int count = 0;
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
}
- }
- @Override
- public void close() {
+ @Override
+ public void collect(
+ StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
- }
- });
- } catch (SuccessException e) {
- // success
- readFirstBatchOfElements = true;
- }
+ emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+ count++;
+ if (count >= numElements / 2) {
+ throw new SuccessException();
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ });
+ } catch (SuccessException e) {
+ // success
+ readFirstBatchOfElements = true;
+ }
+
+ assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
+
+ // draw a snapshot
+ byte[] snapshot = flinkWrapper.snapshotState(0, 0);
+
+ // test that finalizeCheckpoint on CheckpointMark is called
+ final ArrayList<Integer> finalizeList = new ArrayList<>();
+ TestCountingSource.setFinalizeTracker(finalizeList);
+ flinkWrapper.notifyCheckpointComplete(0);
+ assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
+
+ // create a completely new source but restore from the snapshot
+ TestCountingSource restoredSource = new TestCountingSource(numElements);
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
+ new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
+
+ assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
+
+ StreamSource<
+ WindowedValue<KV<Integer, Integer>>,
+ UnboundedSourceWrapper<
+ KV<Integer, Integer>,
+ TestCountingSource.CounterMark>> restoredSourceOperator =
+ new StreamSource<>(restoredFlinkWrapper);
+
+ setupSourceOperator(restoredSourceOperator, numTasks);
+
+ // restore snapshot
+ restoredFlinkWrapper.restoreState(snapshot);
+
+ boolean readSecondBatchOfElements = false;
+
+ // run again and verify that we see the other elements
+ try {
+ restoredSourceOperator.open();
+ restoredSourceOperator.run(checkpointLock,
+ new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+ private int count = 0;
+
+ @Override
+ public void emitWatermark(Watermark watermark) {
+ }
- assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
-
- // draw a snapshot
- byte[] snapshot = flinkWrapper.snapshotState(0, 0);
-
- // test that finalizeCheckpoint on CheckpointMark is called
- final ArrayList<Integer> finalizeList = new ArrayList<>();
- TestCountingSource.setFinalizeTracker(finalizeList);
- flinkWrapper.notifyCheckpointComplete(0);
- assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
-
- // create a completely new source but restore from the snapshot
- TestCountingSource restoredSource = new TestCountingSource(numElements);
- UnboundedSourceWrapper<
- KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
- new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
-
- assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
-
- StreamSource<
- WindowedValue<KV<Integer, Integer>>,
- UnboundedSourceWrapper<
- KV<Integer, Integer>,
- TestCountingSource.CounterMark>> restoredSourceOperator =
- new StreamSource<>(restoredFlinkWrapper);
-
- setupSourceOperator(restoredSourceOperator, numTasks);
-
- // restore snapshot
- restoredFlinkWrapper.restoreState(snapshot);
-
- boolean readSecondBatchOfElements = false;
-
- // run again and verify that we see the other elements
- try {
- restoredSourceOperator.open();
- restoredSourceOperator.run(checkpointLock,
- new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
- private int count = 0;
-
- @Override
- public void emitWatermark(Watermark watermark) {
- }
-
- @Override
- public void collect(
- StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
- emittedElements.add(windowedValueStreamRecord.getValue().getValue());
- count++;
- if (count >= numElements / 2) {
- throw new SuccessException();
+ @Override
+ public void collect(
+ StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+ emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+ count++;
+ if (count >= numElements / 2) {
+ throw new SuccessException();
+ }
}
- }
- @Override
- public void close() {
+ @Override
+ public void close() {
- }
- });
- } catch (SuccessException e) {
- // success
- readSecondBatchOfElements = true;
- }
+ }
+ });
+ } catch (SuccessException e) {
+ // success
+ readSecondBatchOfElements = true;
+ }
- assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+ assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
- assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
+ assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
- // verify that we saw all NUM_ELEMENTS elements
- assertTrue(emittedElements.size() == numElements);
- }
+ // verify that we saw all NUM_ELEMENTS elements
+ assertTrue(emittedElements.size() == numElements);
+ }
- @SuppressWarnings("unchecked")
- private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
- ExecutionConfig executionConfig = new ExecutionConfig();
- StreamConfig cfg = new StreamConfig(new Configuration());
+ @SuppressWarnings("unchecked")
+ private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
+ ExecutionConfig executionConfig = new ExecutionConfig();
+ StreamConfig cfg = new StreamConfig(new Configuration());
- cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+ cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
- Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
+ Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
- StreamTask<?, ?> mockTask = mock(StreamTask.class);
- when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(new Object());
- when(mockTask.getConfiguration()).thenReturn(cfg);
- when(mockTask.getEnvironment()).thenReturn(env);
- when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
- when(mockTask.getAccumulatorMap())
- .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+ StreamTask<?, ?> mockTask = mock(StreamTask.class);
+ when(mockTask.getName()).thenReturn("Mock Task");
+ when(mockTask.getCheckpointLock()).thenReturn(new Object());
+ when(mockTask.getConfiguration()).thenReturn(cfg);
+ when(mockTask.getEnvironment()).thenReturn(env);
+ when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+ when(mockTask.getAccumulatorMap())
+ .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
- operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
+ operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
+ }
+
+ /**
+ * A special {@link RuntimeException} that we throw to signal that the test was successful.
+ */
+ private static class SuccessException extends RuntimeException {
+ }
}
/**
- * A special {@link RuntimeException} that we throw to signal that the test was successful.
+ * Not parameterized tests.
*/
- private static class SuccessException extends RuntimeException {}
+ public static class BasicTest {
+
+ /**
+ * Check serialization a {@link UnboundedSourceWrapper}.
+ */
+ @Test
+ public void testSerialization() throws Exception {
+ final int parallelism = 1;
+ final int numElements = 20;
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ TestCountingSource source = new TestCountingSource(numElements);
+ UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+ new UnboundedSourceWrapper<>(options, source, parallelism);
+
+ InstantiationUtil.serializeObject(flinkWrapper);
+ }
+
+ }
}
[4/4] beam git commit: This closes #1765
Posted by mx...@apache.org.
This closes #1765
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf4450f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf4450f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf4450f
Branch: refs/heads/master
Commit: eaf4450f277ac7ab52fdba88c25415d5a0246c62
Parents: d3b126f 078573e
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Jan 11 19:25:33 2017 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jan 13 19:27:48 2017 +0100
----------------------------------------------------------------------
.../beam/runners/flink/examples/streaming/KafkaIOExamples.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[3/4] beam git commit: [BEAM-1229] flink KafkaIOExamples submit error
Posted by mx...@apache.org.
[BEAM-1229] flink KafkaIOExamples submit error
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/078573e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/078573e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/078573e3
Branch: refs/heads/master
Commit: 078573e30b4c9cb29b3c548f8859bb5b23a7a9d1
Parents: 51820cb
Author: Alexey Diomin <di...@gmail.com>
Authored: Wed Jan 11 17:08:35 2017 +0400
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jan 13 19:27:04 2017 +0100
----------------------------------------------------------------------
.../beam/runners/flink/examples/streaming/KafkaIOExamples.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/078573e3/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 3c8a89b..616e276 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Write;
import org.apache.beam.sdk.options.Default;
@@ -78,7 +79,7 @@ public class KafkaIOExamples {
new SimpleStringSchema(), getKafkaProps(options));
p
- .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+ .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))).setCoder(StringUtf8Coder.of())
.apply(ParDo.of(new PrintFn<>()));
p.run();
@@ -133,6 +134,7 @@ public class KafkaIOExamples {
p
.apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+ .setCoder(AvroCoder.of(MyType.class))
.apply(ParDo.of(new PrintFn<>()));
p.run();
[2/4] beam git commit: This closes #1770
Posted by mx...@apache.org.
This closes #1770
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d3b126f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d3b126f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d3b126f5
Branch: refs/heads/master
Commit: d3b126f5159a2802f91e302d120187d781826271
Parents: f1ea8f9 c2344e9
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Jan 10 19:25:33 2017 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jan 13 19:26:25 2017 +0100
----------------------------------------------------------------------
.../streaming/io/BoundedSourceWrapper.java | 2 +-
.../streaming/io/UnboundedSourceWrapper.java | 2 +-
.../streaming/UnboundedSourceWrapperTest.java | 464 ++++++++++---------
3 files changed, 250 insertions(+), 218 deletions(-)
----------------------------------------------------------------------