You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/25 07:02:26 UTC

[GitHub] [beam] ihji commented on a change in pull request #15028: [BEAM-12076] Adds a Kafka external read transform for reading with metadata

ihji commented on a change in pull request #15028:
URL: https://github.com/apache/beam/pull/15028#discussion_r658512414



##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1544,6 +1571,152 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka header. We define a new class so that we can add schema annotations for
+   * generating Rows.
+   */
+  static class KafkaHeader {
+
+    String key;
+    byte[] value;
+
+    @SchemaCreate
+    public KafkaHeader(String key, byte[] value) {
+      this.key = key;
+      this.value = value;
+    }
+  }
+
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka record with metadata. This class should only be used to represent a Kafka
+   * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord class when Beam Schema
+   * inference supports generics.
+   */
+  static class ExternalKafkaRecord {
+
+    String topic;
+    int partition;
+    long offset;
+    long timestamp;
+    byte[] key;
+    byte[] value;
+    List<KafkaHeader> headers;
+    int timestampTypeId;
+    String timestampTypeName;
+
+    @SchemaCreate
+    public ExternalKafkaRecord(
+        String topic,
+        int partition,
+        long offset,
+        long timestamp,
+        byte[] key,
+        byte[] value,
+        @Nullable List<KafkaHeader> headers,
+        int timestampTypeId,
+        String timestampTypeName) {
+      this.topic = topic;
+      this.partition = partition;
+      this.offset = offset;
+      this.timestamp = timestamp;
+      this.key = key;
+      this.value = value;
+      this.headers = headers;
+      this.timestampTypeId = timestampTypeId;
+      this.timestampTypeName = timestampTypeName;
+    }
+  }
+
+  /**
+   * A {@link PTransform} to read from Kafka topics. Similar to {@link KafkaIO.Read}, but generates
+   * a {@link PCollection} of {@link Row}. This class is primarily used as a cross-language
+   * transform since {@link KafkaRecord} is not a type that can be easily encoded using Beam's
+   * standard coders. See {@link KafkaIO} for more information on usage and configuration of reader.
+   */
+  static class ExternalWithMetadata<K, V> extends PTransform<PBegin, PCollection<Row>> {
+    private final Read<K, V> read;
+
+    ExternalWithMetadata(Read<K, V> read) {
+      super("KafkaIO.Read");

Review comment:
       `TypedWithoutMetadata` also sets the same ptransform name `KafkaIO.Read`:
   ```
   TypedWithoutMetadata(Read<K, V> read) {
         super("KafkaIO.Read");
         this.read = read;
       }
   ```
   Should we use different names for easier debugging?

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -1544,6 +1571,152 @@ public void populateDisplayData(DisplayData.Builder builder) {
     }
   }
 
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka header. We define a new class so that we can add schema annotations for
+   * generating Rows.
+   */
+  static class KafkaHeader {
+
+    String key;
+    byte[] value;
+
+    @SchemaCreate
+    public KafkaHeader(String key, byte[] value) {
+      this.key = key;
+      this.value = value;
+    }
+  }
+
+  @DefaultSchema(JavaFieldSchema.class)
+  @SuppressFBWarnings("URF_UNREAD_FIELD")
+  /**
+   * Represents a Kafka record with metadata. This class should only be used to represent a Kafka
+   * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord class when Beam Schema
+   * inference supports generics.
+   */
+  static class ExternalKafkaRecord {

Review comment:
       I think `ExternalKafkaRecord` is somewhat misleading. I would prefer to use something more intuitive (ex. `RowCodedKafkaRecord`, `KafkaRowWithMetadata`, etc.)

##########
File path: sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
##########
@@ -777,14 +778,22 @@ private static Coder resolveCoder(Class deserializer) {
     @AutoService(ExternalTransformRegistrar.class)
     public static class External implements ExternalTransformRegistrar {
 
-      public static final String URN = "beam:external:java:kafka:read:v1";
+      // Using the transform name in the URN so that the corresponding transform can be easily
+      // identified.
+      public static final String URN_WITH_METADATA =
+          "beam:external:java:kafkaio:externalwithmetadata:v1";

Review comment:
       Is it necessary to use `external` again in `externalwithmetadata`? `external` is already in the URN.
   
   How about `typedwithmetadata` or `rowwithmetadata`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org