You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by re...@apache.org on 2017/10/17 15:17:43 UTC

[08/13] beam git commit: Add runner check. Add withReadCommit() to reader.

Add runner check. Add withReadCommit() to reader.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e280fad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e280fad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e280fad

Branch: refs/heads/master
Commit: 5e280fad28b97c26f27bfb037ef7f45b8fe034e5
Parents: 265e7c7
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Oct 4 17:43:47 2017 -0700
Committer: Raghu Angadi <ra...@google.com>
Committed: Tue Oct 17 00:02:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 31 ++++++++++++++++++++
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 11 ++++---
 2 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5e280fad/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 76e8a1e..b9b6bd4 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -549,6 +549,18 @@ public class KafkaIO {
     }
 
     /**
+     * Sets "isolation_level" to "read_committed" in Kafka consumer configuration. This is
+     * ensures that the consumer does not read uncommitted messages. Kafka version 0.11
+     * introduced transactional writes. Applications requiring end-to-end exactly-once
+     * semantics should only read committed messages. See JavaDoc for {@link KafkaConsumer}
+     * for more description.
+     */
+    public Read<K, V> withReadCommitted() {
+      return updateConsumerProperties(
+        ImmutableMap.<String, Object>of("isolation.level", "read_committed"));
+    }
+
+    /**
      * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
      */
     public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
@@ -1275,6 +1287,9 @@ public class KafkaIO {
     private void updateLatestOffsets() {
       for (PartitionState p : partitionStates) {
         try {
+          // If "read_committed" is enabled in the config, this seeks to 'Last Stable Offset'.
+          // As a result uncommitted messages are not counted in backlog. It is correct since
+          // the reader can not read them anyway.
           consumerSpEL.evaluateSeek2End(offsetConsumer, p.topicPartition);
           long offset = offsetConsumer.position(p.topicPartition);
           p.setLatestOffset(offset);
@@ -1573,6 +1588,22 @@ public class KafkaIO {
       return PDone.in(input.getPipeline());
     }
 
+    public void validate(PipelineOptions options) {
+      if (isEOS()) {
+        String runner = options.getRunner().getName();
+        if (runner.equals("org.apache.beam.runners.direct.DirectRunner")
+          || runner.startsWith("org.apache.beam.runners.dataflow.")
+          || runner.startsWith("org.apache.beam.runners.spark.")) {
+          return;
+        }
+        throw new UnsupportedOperationException(
+          runner + " is not whitelisted among runners compatible with Kafka exactly-once sink. "
+          + "This implementation of exactly-once sink relies on specific checkpoint guarantees. "
+          + "Only the runners with known to have compatible checkpoint semantics are whitelisted."
+        );
+      }
+    }
+
     // set config defaults
     private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
         ImmutableMap.<String, Object>of(

http://git-wip-us.apache.org/repos/asf/beam/blob/5e280fad/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 1532788..b1e4a4d 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -791,9 +791,13 @@ public class KafkaIOTest {
   }
 
   @Test
-  public void testEOSink() throws Exception {
-    // similar to testSink(), enables to EOS.
-    // This does not actually test exactly-once-semantics. Mainly exercises the code.
+  public void testEOSink() {
+    // testSink() with EOS enabled.
+    // This does not actually inject retries in a stage to test exactly-once-semantics.
+    // It mainly exercises the code in normal flow without retries.
+    // Ideally we should test EOS Sink by triggering replays of a messages between stages.
+    // It is not feasible to test such retries with direct runner. When DoFnTester supports
+    // state, we can test KafkaEOWriter DoFn directly to ensure it handles retries correctly.
 
     if (!ProducerSpEL.supportsTransactions()) {
       LOG.warn("testEOSink() is disabled as Kafka client version does not support transactions.");
@@ -832,7 +836,6 @@ public class KafkaIOTest {
     }
   }
 
-
   @Test
   public void testSinkWithSendErrors() throws Throwable {
     // similar to testSink(), except that up to 10 of the send calls to producer will fail