You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/30 09:28:27 UTC

[beam] branch master updated: [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read()

This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f17201  [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read()
7f17201 is described below

commit 7f17201640881e7f4bbf85c1d337735ba66168d6
Author: XuMingmin <Xu...@users.noreply.github.com>
AuthorDate: Wed Jan 30 01:28:14 2019 -0800

    [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read()
---
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 23 +++++++++
 .../beam/sdk/io/kafka/KafkaUnboundedReader.java    | 56 +++++++++++++---------
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 43 +++++++++++++++++
 3 files changed, 100 insertions(+), 22 deletions(-)

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8b3218b..f27ec68 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -345,6 +345,9 @@ public class KafkaIO {
 
     abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
 
+    @Nullable
+    abstract Map<String, Object> getOffsetConsumerConfig();
+
     abstract Builder<K, V> toBuilder();
 
     @AutoValue.Builder
@@ -380,6 +383,8 @@ public class KafkaIO {
       abstract Builder<K, V> setTimestampPolicyFactory(
           TimestampPolicyFactory<K, V> timestampPolicyFactory);
 
+      abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);
+
       abstract Read<K, V> build();
     }
 
@@ -656,6 +661,24 @@ public class KafkaIO {
       return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
     }
 
+    /**
+     * Set additional configuration for the backend offset consumer. It may be required for a
+     * secured Kafka cluster, especially when you see similar WARN log message 'exception while
+     * fetching latest offset for partition {}. will be retried'.
+     *
+     * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
+     * 1. the main consumer, which reads data from kafka;<br>
+     * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
+     * offset;<br>
+     *
+     * <p>By default, offset consumer inherits the configuration from main consumer, with an
+     * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka
+     * which requires more configurations.
+     */
+    public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
+      return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
+    }
+
     /** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
     public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
       return new TypedWithoutMetadata<>(this);
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index ee058aa..580b0bc 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -141,28 +141,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
     consumerPollThread.submit(this::consumerPollLoop);
 
     // offsetConsumer setup :
-
-    Object groupId = spec.getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
-    // override group_id and disable auto_commit so that it does not interfere with main consumer
-    String offsetGroupId =
-        String.format(
-            "%s_offset_consumer_%d_%s",
-            name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
-    Map<String, Object> offsetConsumerConfig = new HashMap<>(spec.getConsumerConfig());
-    offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
-    offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-    // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
-    // fetches latest offset for two reasons : (a) to calculate backlog (number of records
-    // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
-    // for (a) is to leave this config unchanged from the main config (i.e. if there are records
-    // that can't be read because of uncommitted records before them, they shouldn't
-    // ideally count towards backlog when "read_committed" is enabled. But (b)
-    // requires finding out if there are any records left to be read (committed or uncommitted).
-    // Rather than using two separate consumers we will go with better support for (b). If we do
-    // hit a case where a lot of records are not readable (due to some stuck transactions), the
-    // pipeline would report more backlog, but would not be able to consume it. It might be ok
-    // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
-    offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+    Map<String, Object> offsetConsumerConfig = getOffsetConsumerConfig();
 
     offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
     consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions());
@@ -726,6 +705,39 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
     return backlogCount;
   }
 
+  @VisibleForTesting
+  Map<String, Object> getOffsetConsumerConfig() {
+    Map<String, Object> offsetConsumerConfig = new HashMap<>(source.getSpec().getConsumerConfig());
+    offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    Object groupId = source.getSpec().getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
+    // override group_id and disable auto_commit so that it does not interfere with main consumer
+    String offsetGroupId =
+        String.format(
+            "%s_offset_consumer_%d_%s",
+            name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
+    offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
+
+    if (source.getSpec().getOffsetConsumerConfig() != null) {
+      offsetConsumerConfig.putAll(source.getSpec().getOffsetConsumerConfig());
+    }
+
+    // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
+    // fetches latest offset for two reasons : (a) to calculate backlog (number of records
+    // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
+    // for (a) is to leave this config unchanged from the main config (i.e. if there are records
+    // that can't be read because of uncommitted records before them, they shouldn't
+    // ideally count towards backlog when "read_committed" is enabled. But (b)
+    // requires finding out if there are any records left to be read (committed or uncommitted).
+    // Rather than using two separate consumers we will go with better support for (b). If we do
+    // hit a case where a lot of records are not readable (due to some stuck transactions), the
+    // pipeline would report more backlog, but would not be able to consume it. It might be ok
+    // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
+    offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+
+    return offsetConsumerConfig;
+  }
+
   @Override
   public void close() throws IOException {
     closed.set(true);
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 63e14a3..e6c3ca9 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -1528,6 +1528,49 @@ public class KafkaIOTest {
     }
   }
 
+  @Test
+  public void testOffsetConsumerConfigOverrides() throws Exception {
+    KafkaUnboundedReader reader1 =
+        new KafkaUnboundedReader(
+            new KafkaUnboundedSource(
+                KafkaIO.read()
+                    .withBootstrapServers("broker_1:9092,broker_2:9092")
+                    .withTopic("my_topic")
+                    .withOffsetConsumerConfigOverrides(null),
+                0),
+            null);
+    assertTrue(
+        reader1
+            .getOffsetConsumerConfig()
+            .get(ConsumerConfig.GROUP_ID_CONFIG)
+            .toString()
+            .matches(".*_offset_consumer_\\d+_none"));
+    assertEquals(
+        false, reader1.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    assertEquals(
+        "read_uncommitted",
+        reader1.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
+
+    String offsetGroupId = "group.offsetConsumer";
+    KafkaUnboundedReader reader2 =
+        new KafkaUnboundedReader(
+            new KafkaUnboundedSource(
+                KafkaIO.read()
+                    .withBootstrapServers("broker_1:9092,broker_2:9092")
+                    .withTopic("my_topic")
+                    .withOffsetConsumerConfigOverrides(
+                        ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId)),
+                0),
+            null);
+    assertEquals(
+        offsetGroupId, reader2.getOffsetConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG));
+    assertEquals(
+        false, reader2.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    assertEquals(
+        "read_uncommitted",
+        reader2.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
+  }
+
   private static void verifyProducerRecords(
       MockProducer<Integer, Long> mockProducer,
       String topic,