You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2017/01/13 18:36:17 UTC

[1/4] beam git commit: [BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource fix javadoc for BoundedSourceWrapper

Repository: beam
Updated Branches:
  refs/heads/master f1ea8f951 -> eaf4450f2


[BEAM-1255] java.io.NotSerializableException in flink on UnboundedSource
fix javadoc for BoundedSourceWrapper


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2344e94
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2344e94
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2344e94

Branch: refs/heads/master
Commit: c2344e944b25d884f25375ea7fce3a9c203cdb9a
Parents: a93e218
Author: Alexey Diomin <di...@gmail.com>
Authored: Thu Jan 12 10:44:43 2017 +0400
Committer: Alexey Diomin <di...@gmail.com>
Committed: Thu Jan 12 15:40:37 2017 +0400

----------------------------------------------------------------------
 .../streaming/io/BoundedSourceWrapper.java      |   2 +-
 .../streaming/io/UnboundedSourceWrapper.java    |   2 +-
 .../streaming/UnboundedSourceWrapperTest.java   | 464 ++++++++++---------
 3 files changed, 250 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
index df49a49..909cb0e 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java
@@ -35,7 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Wrapper for executing {@link BoundedSource UnboundedSources} as a Flink Source.
+ * Wrapper for executing {@link BoundedSource BoundedSources} as a Flink Source.
  */
 public class BoundedSourceWrapper<OutputT>
     extends RichParallelSourceFunction<WindowedValue<OutputT>>

http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
index af955ba..68746b2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java
@@ -143,7 +143,7 @@ public class UnboundedSourceWrapper<
     } else {
 
       Coder<? extends UnboundedSource<OutputT, CheckpointMarkT>> sourceCoder =
-          SerializableCoder.of(new TypeDescriptor<UnboundedSource<OutputT, CheckpointMarkT>>() {
+          (Coder) SerializableCoder.of(new TypeDescriptor<UnboundedSource>() {
           });
 
       checkpointCoder = (ListCoder) ListCoder.of(KvCoder.of(sourceCoder, checkpointMarkCoder));

http://git-wip-us.apache.org/repos/asf/beam/blob/c2344e94/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
index 9e8261a..b0be98b 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java
@@ -46,259 +46,291 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.InstantiationUtil;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 /**
  * Tests for {@link UnboundedSourceWrapper}.
  */
-@RunWith(Parameterized.class)
+@RunWith(Enclosed.class)
 public class UnboundedSourceWrapperTest {
 
-  private final int numTasks;
-  private final int numSplits;
+  /**
+   * Parameterized tests.
+   */
+  @RunWith(Parameterized.class)
+  public static class UnboundedSourceWrapperTestWithParams {
+    private final int numTasks;
+    private final int numSplits;
+
+    public UnboundedSourceWrapperTestWithParams(int numTasks, int numSplits) {
+      this.numTasks = numTasks;
+      this.numSplits = numSplits;
+    }
 
-  public UnboundedSourceWrapperTest(int numTasks, int numSplits) {
-    this.numTasks = numTasks;
-    this.numSplits = numSplits;
-  }
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+      /*
+       * Parameters for initializing the tests:
+       * {numTasks, numSplits}
+       * The test currently assumes powers of two for some assertions.
+       */
+      return Arrays.asList(new Object[][]{
+          {1, 1}, {1, 2}, {1, 4},
+          {2, 1}, {2, 2}, {2, 4},
+          {4, 1}, {4, 2}, {4, 4}
+      });
+    }
 
-  @Parameterized.Parameters
-  public static Collection<Object[]> data() {
-    /*
-     * Parameters for initializing the tests:
-     * {numTasks, numSplits}
-     * The test currently assumes powers of two for some assertions.
+    /**
+     * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
+     * If numSplits > numTasks the source has one source will manage multiple readers.
      */
-    return Arrays.asList(new Object[][] {
-      {1, 1}, {1, 2}, {1, 4},
-      {2, 1}, {2, 2}, {2, 4},
-      {4, 1}, {4, 2}, {4, 4}
-    });
-  }
+    @Test
+    public void testReaders() throws Exception {
+      final int numElements = 20;
+      final Object checkpointLock = new Object();
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+      // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+      // elements later.
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, numSplits);
+
+      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+      StreamSource<WindowedValue<
+          KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+      setupSourceOperator(sourceOperator, numTasks);
+
+      try {
+        sourceOperator.open();
+        sourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
+              }
 
-  /**
-   * Creates a {@link UnboundedSourceWrapper} that has one or multiple readers per source.
-   * If numSplits > numTasks the source has one source will manage multiple readers.
-   */
-  @Test
-  public void testReaders() throws Exception {
-    final int numElements = 20;
-    final Object checkpointLock = new Object();
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-    // elements later.
-    TestCountingSource source = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, numSplits);
-
-    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
-    StreamSource<WindowedValue<
-        KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
-    setupSourceOperator(sourceOperator, numTasks);
-
-    try {
-      sourceOperator.open();
-      sourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-            private int count = 0;
-
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
-              count++;
-              if (count >= numElements) {
-                throw new SuccessException();
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+
+                count++;
+                if (count >= numElements) {
+                  throw new SuccessException();
+                }
               }
-            }
 
-            @Override
-            public void close() {
+              @Override
+              public void close() {
 
-            }
-          });
-    } catch (SuccessException e) {
+              }
+            });
+      } catch (SuccessException e) {
 
-      assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+        assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
 
-      // success
-      return;
+        // success
+        return;
+      }
+      fail("Read terminated without producing expected number of outputs");
     }
-    fail("Read terminated without producing expected number of outputs");
-  }
 
-  /**
-   * Verify that snapshot/restore work as expected. We bring up a source and cancel
-   * after seeing a certain number of elements. Then we snapshot that source,
-   * bring up a completely new source that we restore from the snapshot and verify
-   * that we see all expected elements in the end.
-   */
-  @Test
-  public void testRestore() throws Exception {
-    final int numElements = 20;
-    final Object checkpointLock = new Object();
-    PipelineOptions options = PipelineOptionsFactory.create();
-
-    // this source will emit exactly NUM_ELEMENTS across all parallel readers,
-    // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
-    // elements later.
-    TestCountingSource source = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
-        new UnboundedSourceWrapper<>(options, source, numSplits);
-
-    assertEquals(numSplits, flinkWrapper.getSplitSources().size());
-
-    StreamSource<
-        WindowedValue<KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
-
-    setupSourceOperator(sourceOperator, numTasks);
-
-    final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
-
-    boolean readFirstBatchOfElements = false;
-
-    try {
-      sourceOperator.open();
-      sourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-            private int count = 0;
-
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-
-              emittedElements.add(windowedValueStreamRecord.getValue().getValue());
-              count++;
-              if (count >= numElements / 2) {
-                throw new SuccessException();
+    /**
+     * Verify that snapshot/restore work as expected. We bring up a source and cancel
+     * after seeing a certain number of elements. Then we snapshot that source,
+     * bring up a completely new source that we restore from the snapshot and verify
+     * that we see all expected elements in the end.
+     */
+    @Test
+    public void testRestore() throws Exception {
+      final int numElements = 20;
+      final Object checkpointLock = new Object();
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      // this source will emit exactly NUM_ELEMENTS across all parallel readers,
+      // afterwards it will stall. We check whether we also receive NUM_ELEMENTS
+      // elements later.
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, numSplits);
+
+      assertEquals(numSplits, flinkWrapper.getSplitSources().size());
+
+      StreamSource<
+          WindowedValue<KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> sourceOperator = new StreamSource<>(flinkWrapper);
+
+      setupSourceOperator(sourceOperator, numTasks);
+
+      final Set<KV<Integer, Integer>> emittedElements = new HashSet<>();
+
+      boolean readFirstBatchOfElements = false;
+
+      try {
+        sourceOperator.open();
+        sourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
               }
-            }
 
-            @Override
-            public void close() {
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
 
-            }
-          });
-    } catch (SuccessException e) {
-      // success
-      readFirstBatchOfElements = true;
-    }
+                emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+                count++;
+                if (count >= numElements / 2) {
+                  throw new SuccessException();
+                }
+              }
+
+              @Override
+              public void close() {
+
+              }
+            });
+      } catch (SuccessException e) {
+        // success
+        readFirstBatchOfElements = true;
+      }
+
+      assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
+
+      // draw a snapshot
+      byte[] snapshot = flinkWrapper.snapshotState(0, 0);
+
+      // test that finalizeCheckpoint on CheckpointMark is called
+      final ArrayList<Integer> finalizeList = new ArrayList<>();
+      TestCountingSource.setFinalizeTracker(finalizeList);
+      flinkWrapper.notifyCheckpointComplete(0);
+      assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
+
+      // create a completely new source but restore from the snapshot
+      TestCountingSource restoredSource = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<
+          KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
+          new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
+
+      assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
+
+      StreamSource<
+          WindowedValue<KV<Integer, Integer>>,
+          UnboundedSourceWrapper<
+              KV<Integer, Integer>,
+              TestCountingSource.CounterMark>> restoredSourceOperator =
+          new StreamSource<>(restoredFlinkWrapper);
+
+      setupSourceOperator(restoredSourceOperator, numTasks);
+
+      // restore snapshot
+      restoredFlinkWrapper.restoreState(snapshot);
+
+      boolean readSecondBatchOfElements = false;
+
+      // run again and verify that we see the other elements
+      try {
+        restoredSourceOperator.open();
+        restoredSourceOperator.run(checkpointLock,
+            new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
+              private int count = 0;
+
+              @Override
+              public void emitWatermark(Watermark watermark) {
+              }
 
-    assertTrue("Did not successfully read first batch of elements.", readFirstBatchOfElements);
-
-    // draw a snapshot
-    byte[] snapshot = flinkWrapper.snapshotState(0, 0);
-
-    // test that finalizeCheckpoint on CheckpointMark is called
-    final ArrayList<Integer> finalizeList = new ArrayList<>();
-    TestCountingSource.setFinalizeTracker(finalizeList);
-    flinkWrapper.notifyCheckpointComplete(0);
-    assertEquals(flinkWrapper.getLocalSplitSources().size(), finalizeList.size());
-
-    // create a completely new source but restore from the snapshot
-    TestCountingSource restoredSource = new TestCountingSource(numElements);
-    UnboundedSourceWrapper<
-        KV<Integer, Integer>, TestCountingSource.CounterMark> restoredFlinkWrapper =
-        new UnboundedSourceWrapper<>(options, restoredSource, numSplits);
-
-    assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size());
-
-    StreamSource<
-        WindowedValue<KV<Integer, Integer>>,
-        UnboundedSourceWrapper<
-            KV<Integer, Integer>,
-            TestCountingSource.CounterMark>> restoredSourceOperator =
-        new StreamSource<>(restoredFlinkWrapper);
-
-    setupSourceOperator(restoredSourceOperator, numTasks);
-
-    // restore snapshot
-    restoredFlinkWrapper.restoreState(snapshot);
-
-    boolean readSecondBatchOfElements = false;
-
-    // run again and verify that we see the other elements
-    try {
-      restoredSourceOperator.open();
-      restoredSourceOperator.run(checkpointLock,
-          new Output<StreamRecord<WindowedValue<KV<Integer, Integer>>>>() {
-            private int count = 0;
-
-            @Override
-            public void emitWatermark(Watermark watermark) {
-            }
-
-            @Override
-            public void collect(
-                StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
-              emittedElements.add(windowedValueStreamRecord.getValue().getValue());
-              count++;
-              if (count >= numElements / 2) {
-                throw new SuccessException();
+              @Override
+              public void collect(
+                  StreamRecord<WindowedValue<KV<Integer, Integer>>> windowedValueStreamRecord) {
+                emittedElements.add(windowedValueStreamRecord.getValue().getValue());
+                count++;
+                if (count >= numElements / 2) {
+                  throw new SuccessException();
+                }
               }
-            }
 
-            @Override
-            public void close() {
+              @Override
+              public void close() {
 
-            }
-          });
-    } catch (SuccessException e) {
-      // success
-      readSecondBatchOfElements = true;
-    }
+              }
+            });
+      } catch (SuccessException e) {
+        // success
+        readSecondBatchOfElements = true;
+      }
 
-    assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
+      assertEquals(Math.max(1, numSplits / numTasks), flinkWrapper.getLocalSplitSources().size());
 
-    assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
+      assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements);
 
-    // verify that we saw all NUM_ELEMENTS elements
-    assertTrue(emittedElements.size() == numElements);
-  }
+      // verify that we saw all NUM_ELEMENTS elements
+      assertTrue(emittedElements.size() == numElements);
+    }
 
-  @SuppressWarnings("unchecked")
-  private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
-    ExecutionConfig executionConfig = new ExecutionConfig();
-    StreamConfig cfg = new StreamConfig(new Configuration());
+    @SuppressWarnings("unchecked")
+    private static <T> void setupSourceOperator(StreamSource<T, ?> operator, int numSubTasks) {
+      ExecutionConfig executionConfig = new ExecutionConfig();
+      StreamConfig cfg = new StreamConfig(new Configuration());
 
-    cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
+      cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
-    Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
+      Environment env = new DummyEnvironment("MockTwoInputTask", numSubTasks, 0);
 
-    StreamTask<?, ?> mockTask = mock(StreamTask.class);
-    when(mockTask.getName()).thenReturn("Mock Task");
-    when(mockTask.getCheckpointLock()).thenReturn(new Object());
-    when(mockTask.getConfiguration()).thenReturn(cfg);
-    when(mockTask.getEnvironment()).thenReturn(env);
-    when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-    when(mockTask.getAccumulatorMap())
-        .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
+      StreamTask<?, ?> mockTask = mock(StreamTask.class);
+      when(mockTask.getName()).thenReturn("Mock Task");
+      when(mockTask.getCheckpointLock()).thenReturn(new Object());
+      when(mockTask.getConfiguration()).thenReturn(cfg);
+      when(mockTask.getEnvironment()).thenReturn(env);
+      when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+      when(mockTask.getAccumulatorMap())
+          .thenReturn(Collections.<String, Accumulator<?, ?>>emptyMap());
 
-    operator.setup(mockTask, cfg, (Output< StreamRecord<T>>) mock(Output.class));
+      operator.setup(mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class));
+    }
+
+    /**
+     * A special {@link RuntimeException} that we throw to signal that the test was successful.
+     */
+    private static class SuccessException extends RuntimeException {
+    }
   }
 
   /**
-   * A special {@link RuntimeException} that we throw to signal that the test was successful.
+   * Not parameterized tests.
    */
-  private static class SuccessException extends RuntimeException {}
+  public static class BasicTest {
+
+    /**
+     * Check serialization a {@link UnboundedSourceWrapper}.
+     */
+    @Test
+    public void testSerialization() throws Exception {
+      final int parallelism = 1;
+      final int numElements = 20;
+      PipelineOptions options = PipelineOptionsFactory.create();
+
+      TestCountingSource source = new TestCountingSource(numElements);
+      UnboundedSourceWrapper<KV<Integer, Integer>, TestCountingSource.CounterMark> flinkWrapper =
+          new UnboundedSourceWrapper<>(options, source, parallelism);
+
+      InstantiationUtil.serializeObject(flinkWrapper);
+    }
+
+  }
 }


[4/4] beam git commit: This closes #1765

Posted by mx...@apache.org.
This closes #1765


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/eaf4450f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/eaf4450f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/eaf4450f

Branch: refs/heads/master
Commit: eaf4450f277ac7ab52fdba88c25415d5a0246c62
Parents: d3b126f 078573e
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Jan 11 19:25:33 2017 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jan 13 19:27:48 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/flink/examples/streaming/KafkaIOExamples.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[3/4] beam git commit: [BEAM-1229] flink KafkaIOExamples submit error

Posted by mx...@apache.org.
[BEAM-1229] flink KafkaIOExamples submit error


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/078573e3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/078573e3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/078573e3

Branch: refs/heads/master
Commit: 078573e30b4c9cb29b3c548f8859bb5b23a7a9d1
Parents: 51820cb
Author: Alexey Diomin <di...@gmail.com>
Authored: Wed Jan 11 17:08:35 2017 +0400
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jan 13 19:27:04 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/flink/examples/streaming/KafkaIOExamples.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/078573e3/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
index 3c8a89b..616e276 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java
@@ -29,6 +29,7 @@ import org.apache.beam.runners.flink.translation.wrappers.streaming.io.Unbounded
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.Write;
 import org.apache.beam.sdk.options.Default;
@@ -78,7 +79,7 @@ public class KafkaIOExamples {
                 new SimpleStringSchema(), getKafkaProps(options));
 
         p
-            .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+            .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))).setCoder(StringUtf8Coder.of())
             .apply(ParDo.of(new PrintFn<>()));
 
         p.run();
@@ -133,6 +134,7 @@ public class KafkaIOExamples {
 
         p
             .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
+                .setCoder(AvroCoder.of(MyType.class))
             .apply(ParDo.of(new PrintFn<>()));
 
         p.run();


[2/4] beam git commit: This closes #1770

Posted by mx...@apache.org.
This closes #1770


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d3b126f5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d3b126f5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d3b126f5

Branch: refs/heads/master
Commit: d3b126f5159a2802f91e302d120187d781826271
Parents: f1ea8f9 c2344e9
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Jan 10 19:25:33 2017 +0100
Committer: Maximilian Michels <mx...@apache.org>
Committed: Fri Jan 13 19:26:25 2017 +0100

----------------------------------------------------------------------
 .../streaming/io/BoundedSourceWrapper.java      |   2 +-
 .../streaming/io/UnboundedSourceWrapper.java    |   2 +-
 .../streaming/UnboundedSourceWrapperTest.java   | 464 ++++++++++---------
 3 files changed, 250 insertions(+), 218 deletions(-)
----------------------------------------------------------------------