You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ar...@apache.org on 2019/01/30 09:28:27 UTC
[beam] branch master updated: [BEAM-6285] add parameters for
offsetConsumer in KafkaIO.read()
This is an automated email from the ASF dual-hosted git repository.
aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7f17201 [BEAM-6285] add parameters for offsetConsumer in KafkaIO.read()
7f17201 is described below
commit 7f17201640881e7f4bbf85c1d337735ba66168d6
Author: XuMingmin <Xu...@users.noreply.github.com>
AuthorDate: Wed Jan 30 01:28:14 2019 -0800
[BEAM-6285] add parameters for offsetConsumer in KafkaIO.read()
---
.../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 23 +++++++++
.../beam/sdk/io/kafka/KafkaUnboundedReader.java | 56 +++++++++++++---------
.../org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 43 +++++++++++++++++
3 files changed, 100 insertions(+), 22 deletions(-)
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 8b3218b..f27ec68 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -345,6 +345,9 @@ public class KafkaIO {
abstract TimestampPolicyFactory<K, V> getTimestampPolicyFactory();
+ @Nullable
+ abstract Map<String, Object> getOffsetConsumerConfig();
+
abstract Builder<K, V> toBuilder();
@AutoValue.Builder
@@ -380,6 +383,8 @@ public class KafkaIO {
abstract Builder<K, V> setTimestampPolicyFactory(
TimestampPolicyFactory<K, V> timestampPolicyFactory);
+ abstract Builder<K, V> setOffsetConsumerConfig(Map<String, Object> offsetConsumerConfig);
+
abstract Read<K, V> build();
}
@@ -656,6 +661,24 @@ public class KafkaIO {
return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
}
+ /**
+ * Set additional configuration for the backend offset consumer. It may be required for a
+ * secured Kafka cluster, especially when you see similar WARN log message 'exception while
+ * fetching latest offset for partition {}. will be retried'.
+ *
+ * <p>In {@link KafkaIO#read()}, there're two consumers running in the backend actually:<br>
+ * 1. the main consumer, which reads data from kafka;<br>
+ * 2. the secondary offset consumer, which is used to estimate backlog, by fetching latest
+ * offset;<br>
+ *
+ * <p>By default, offset consumer inherits the configuration from main consumer, with an
+ * auto-generated {@link ConsumerConfig#GROUP_ID_CONFIG}. This may not work in a secured Kafka
+ * which requires more configurations.
+ */
+ public Read<K, V> withOffsetConsumerConfigOverrides(Map<String, Object> offsetConsumerConfig) {
+ return toBuilder().setOffsetConsumerConfig(offsetConsumerConfig).build();
+ }
+
/** Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata. */
public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata() {
return new TypedWithoutMetadata<>(this);
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
index ee058aa..580b0bc 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
@@ -141,28 +141,7 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
consumerPollThread.submit(this::consumerPollLoop);
// offsetConsumer setup :
-
- Object groupId = spec.getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
- // override group_id and disable auto_commit so that it does not interfere with main consumer
- String offsetGroupId =
- String.format(
- "%s_offset_consumer_%d_%s",
- name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
- Map<String, Object> offsetConsumerConfig = new HashMap<>(spec.getConsumerConfig());
- offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
- offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
- // fetches latest offset for two reasons : (a) to calculate backlog (number of records
- // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
- // for (a) is to leave this config unchanged from the main config (i.e. if there are records
- // that can't be read because of uncommitted records before them, they shouldn't
- // ideally count towards backlog when "read_committed" is enabled. But (b)
- // requires finding out if there are any records left to be read (committed or uncommitted).
- // Rather than using two separate consumers we will go with better support for (b). If we do
- // hit a case where a lot of records are not readable (due to some stuck transactions), the
- // pipeline would report more backlog, but would not be able to consume it. It might be ok
- // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
- offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+ Map<String, Object> offsetConsumerConfig = getOffsetConsumerConfig();
offsetConsumer = spec.getConsumerFactoryFn().apply(offsetConsumerConfig);
consumerSpEL.evaluateAssign(offsetConsumer, spec.getTopicPartitions());
@@ -726,6 +705,39 @@ class KafkaUnboundedReader<K, V> extends UnboundedReader<KafkaRecord<K, V>> {
return backlogCount;
}
+ @VisibleForTesting
+ Map<String, Object> getOffsetConsumerConfig() {
+ Map<String, Object> offsetConsumerConfig = new HashMap<>(source.getSpec().getConsumerConfig());
+ offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ Object groupId = source.getSpec().getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG);
+ // override group_id and disable auto_commit so that it does not interfere with main consumer
+ String offsetGroupId =
+ String.format(
+ "%s_offset_consumer_%d_%s",
+ name, (new Random()).nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
+ offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
+
+ if (source.getSpec().getOffsetConsumerConfig() != null) {
+ offsetConsumerConfig.putAll(source.getSpec().getOffsetConsumerConfig());
+ }
+
+ // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
+ // fetches latest offset for two reasons : (a) to calculate backlog (number of records
+ // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
+ // for (a) is to leave this config unchanged from the main config (i.e. if there are records
+ // that can't be read because of uncommitted records before them, they shouldn't
+ // ideally count towards backlog when "read_committed" is enabled. But (b)
+ // requires finding out if there are any records left to be read (committed or uncommitted).
+ // Rather than using two separate consumers we will go with better support for (b). If we do
+ // hit a case where a lot of records are not readable (due to some stuck transactions), the
+ // pipeline would report more backlog, but would not be able to consume it. It might be ok
+ // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
+ offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+
+ return offsetConsumerConfig;
+ }
+
@Override
public void close() throws IOException {
closed.set(true);
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 63e14a3..e6c3ca9 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -1528,6 +1528,49 @@ public class KafkaIOTest {
}
}
+ @Test
+ public void testOffsetConsumerConfigOverrides() throws Exception {
+ KafkaUnboundedReader reader1 =
+ new KafkaUnboundedReader(
+ new KafkaUnboundedSource(
+ KafkaIO.read()
+ .withBootstrapServers("broker_1:9092,broker_2:9092")
+ .withTopic("my_topic")
+ .withOffsetConsumerConfigOverrides(null),
+ 0),
+ null);
+ assertTrue(
+ reader1
+ .getOffsetConsumerConfig()
+ .get(ConsumerConfig.GROUP_ID_CONFIG)
+ .toString()
+ .matches(".*_offset_consumer_\\d+_none"));
+ assertEquals(
+ false, reader1.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ assertEquals(
+ "read_uncommitted",
+ reader1.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
+
+ String offsetGroupId = "group.offsetConsumer";
+ KafkaUnboundedReader reader2 =
+ new KafkaUnboundedReader(
+ new KafkaUnboundedSource(
+ KafkaIO.read()
+ .withBootstrapServers("broker_1:9092,broker_2:9092")
+ .withTopic("my_topic")
+ .withOffsetConsumerConfigOverrides(
+ ImmutableMap.of(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId)),
+ 0),
+ null);
+ assertEquals(
+ offsetGroupId, reader2.getOffsetConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG));
+ assertEquals(
+ false, reader2.getOffsetConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+ assertEquals(
+ "read_uncommitted",
+ reader2.getOffsetConsumerConfig().get(ConsumerConfig.ISOLATION_LEVEL_CONFIG));
+ }
+
private static void verifyProducerRecords(
MockProducer<Integer, Long> mockProducer,
String topic,