You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/01/07 20:33:00 UTC

[GitHub] [druid] gianm commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

gianm commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553572213



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
##########
@@ -74,7 +74,7 @@ public SequenceOffsetType getSequenceNumber()
   }
 
   @NotNull
-  public List<byte[]> getData()
+  public List<? extends ByteEntity> getData()

Review comment:
       This change means we're returning a `ByteBuffer` instead of a `byte[]`, which opens up potential issues due to callers handling the position and limit wrong. What are the expectations? Is it ok for callers to change the position of the underlying buffer or should they refrain? If they do change it, does that cause problems?
   
   (For example: do we have situations where something calls `getData()` to parse a value, and that updates the position, and then later on `getData()` is called again for some other purpose like logging? If so — that'd break as a result of this change.)
   
   Whatever the case, the javadoc for this method should describe the conclusion.

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaRecordEntity extends ByteEntity

Review comment:
       Could you please add javadocs explaining how this class is meant to be used? Something like:
   
   > A ByteEntity generated by {@link KafkaRecordSupplier} and fed to any {@link InputFormat} used by a Kafka indexing tasks. It can be used as a regular ByteEntity, in which case the Kafka message value is returned. But the {@link #getRecord} method also allows Kafka-aware InputFormat implementations to read the full Kafka message, including headers, key, and timestamp.
   >
   > This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
##########
@@ -119,15 +120,16 @@ public void seekToLatest(Set<StreamPartition<Integer>> partitions)
 
   @Nonnull
   @Override
-  public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
+  public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
   {
-    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
     for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
+
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(record.value())
+          record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))

Review comment:
       We should put in a comment or javadoc noting that records with null values will be skipped, even if the other stuff (keys, timestamp, headers) are not null.
   
   Here's a suggestion:
   
   1. Add a note to the javadoc of KafkaRecordEntity mentioning that these entities won't be generated for messages with null values
   2. Add a null-value check to the constructor of KafkaRecordEntity enforcing that comment

##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -199,41 +199,41 @@
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
   {
     return ImmutableList.of(
-        new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")),

Review comment:
       Unless I missed it — could you add a test that uses a custom InputFormat and verifies that something other than the value can be read properly?

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())

Review comment:
       It's OK, because `fromUtf8` doesn't throw exceptions, it just replaces invalid sequences with the [replacement character](https://www.fileformat.info/info/unicode/char/fffd/index.htm). Although the log messages will look very weird.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
-            );
+            if (log.isTraceEnabled()) {

Review comment:
       Nice 😄
   
   I bet it will speed up Kinesis ingestion a bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org