You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "kennknowles (via GitHub)" <gi...@apache.org> on 2023/02/01 19:22:05 UTC

[GitHub] [beam] kennknowles commented on a diff in pull request #24799: Create an Example IO to pair with the How to write an IO guide

kennknowles commented on code in PR #24799:
URL: https://github.com/apache/beam/pull/24799#discussion_r1093635250


##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIOUtils.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+public class ExampleKafkaReadIOUtils {
+  static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+      ImmutableMap.of(
+          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,

Review Comment:
   FWIW this way of building a map makes me sad and autoformat always takes away any distinction you may try to create between keys and values. Maybe a builder so the keys and values are not conflated in the same list?



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/GenerateTopicPartitions.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.Preconditions;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+/**
+ * GenerateTopicPartitions is a straightforward DoFn designed to take the list of topics provided in
+ * its configuration, and yield all the current {@link TopicPartition}s for those topics
+ *
+ * <p>It consumes a byte[], but this is from an Impulse, and is just used to start the processing.
+ *
+ * <p>Because this process happens once and is bounded by the number of current TopicPartitions,
+ * there is no need to leverage advanced features for performance, stability, or correctness.
+ */
+public class GenerateTopicPartitions extends DoFn<byte[], TopicPartition> {
+  final Map<String, Object> consumerConfig;
+  final List<String> topics;
+
+  GenerateTopicPartitions(
+      @NonNull Map<String, Object> consumerConfig, @NonNull List<String> topics) {

Review Comment:
   `@NonNull` should be default, hence doesn't need to be annotated.



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIOUtils.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+public class ExampleKafkaReadIOUtils {
+  static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+      ImmutableMap.of(
+          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+          ByteArrayDeserializer.class.getName(),
+          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+          ByteArrayDeserializer.class.getName(),
+          ConsumerConfig.RECEIVE_BUFFER_CONFIG,
+          512 * 1024,
+          // default to latest offset when we are not resuming.
+          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+          "latest",
+          // disable auto commit of offsets. we don't require group_id. could be enabled by user.
+          ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+          false);
+
+  static Map<String, Object> getOffsetConsumerConfig(
+      String name, Map<String, Object> consumerConfig) {
+    Map<String, Object> offsetConsumerConfig = new HashMap<>(consumerConfig);
+    offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    Object groupId = consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);
+    // override group_id and disable auto_commit so that it does not interfere with main consumer
+    String offsetGroupId =
+        String.format(
+            "%s_offset_consumer_%d_%s",
+            name, new Random().nextInt(Integer.MAX_VALUE), (groupId == null ? "none" : groupId));
+    offsetConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, offsetGroupId);
+
+    // Force read isolation level to 'read_uncommitted' for offset consumer. This consumer
+    // fetches latest offset for two reasons : (a) to calculate backlog (number of records
+    // yet to be consumed) (b) to advance watermark if the backlog is zero. The right thing to do
+    // for (a) is to leave this config unchanged from the main config (i.e. if there are records
+    // that can't be read because of uncommitted records before them, they shouldn't
+    // ideally count towards backlog when "read_committed" is enabled. But (b)
+    // requires finding out if there are any records left to be read (committed or uncommitted).
+    // Rather than using two separate consumers we will go with better support for (b). If we do
+    // hit a case where a lot of records are not readable (due to some stuck transactions), the
+    // pipeline would report more backlog, but would not be able to consume it. It might be ok
+    // since CPU consumed on the workers would be low and will likely avoid unnecessary upscale.
+    offsetConsumerConfig.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted");
+
+    return offsetConsumerConfig;
+  }
+
+  /**
+   * Returns a new config map which is merge of current config and updates. Verifies the updates do
+   * not includes ignored properties.
+   */
+  static Map<String, Object> updateKafkaProperties(
+      Map<String, Object> currentConfig, Map<String, Object> updates) {
+
+    Map<String, Object> config = new HashMap<>(currentConfig);

Review Comment:
   If it is not required to be a mutable map, I would suggest something like `currentConfig.toBuilder().putAll(updates).build()` or whatever the invocation should look like.



##########
examples/java/src/main/java/org/apache/beam/examples/io/examplekafkaread/ExampleKafkaReadIOUtils.java:
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.io.examplekafkaread;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+public class ExampleKafkaReadIOUtils {
+  static final Map<String, Object> DEFAULT_CONSUMER_PROPERTIES =
+      ImmutableMap.of(
+          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+          ByteArrayDeserializer.class.getName(),
+          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+          ByteArrayDeserializer.class.getName(),
+          ConsumerConfig.RECEIVE_BUFFER_CONFIG,
+          512 * 1024,
+          // default to latest offset when we are not resuming.
+          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+          "latest",
+          // disable auto commit of offsets. we don't require group_id. could be enabled by user.
+          ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+          false);
+
+  static Map<String, Object> getOffsetConsumerConfig(
+      String name, Map<String, Object> consumerConfig) {
+    Map<String, Object> offsetConsumerConfig = new HashMap<>(consumerConfig);
+    offsetConsumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+    Object groupId = consumerConfig.get(ConsumerConfig.GROUP_ID_CONFIG);

Review Comment:
   Does this need any validation? `Object` is quite general. The line below will implicitly `toString()` it but that could result in something weird.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org