You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2021/01/05 20:27:50 UTC

[GitHub] [druid] xvrl opened a new pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

xvrl opened a new pull request #10730:
URL: https://github.com/apache/druid/pull/10730


   Today Kafka message support in streaming indexing tasks is limited to
   message values, and does not provide a way to expose Kafka headers,
   timestamps, or keys, which may be of interest to more specialized
   Druid input formats. For instance, Kafka headers may be used to indicate
   payload format/encoding or additional metadata, and timestamps are often
   omitted from values in Kafka streams applications, since they are
   included in the record.
   
   This change proposes to introduce KafkaRecordEntity as InputEntity,
   which would give input formats full access to the underlying Kafka record,
   including headers, key, timestamps. It would also open access to low-level
   information such as topic, partition, offset if needed.
   
   KafkaEntity is a subclass of ByteEntity for backwards compatibility with
   existing input formats, and to avoid introducing unnecessary complexity
   for Kinesis indexing tasks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553586787



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
##########
@@ -74,7 +74,7 @@ public SequenceOffsetType getSequenceNumber()
   }
 
   @NotNull
-  public List<byte[]> getData()
+  public List<? extends ByteEntity> getData()

Review comment:
       I did encounter some of this in test cases, where we would call getData on a record multiple times, causing issues. In general however, an InputEntity doesn't support being read multiple times, e.g. for `ByteEntity`, once it's been read via `InputEntity.open()`, the underlying buffer would have been exhausted and return an empty stream on subsequent calls to `open()`.
   
   Except for one logging case, all the calls to `OrderedPartitionableRecord.getData()` I could find would only be executed once per record, for the purpose of exposing InputEntities to parsers, which should ensure the buffers only get read once or treated appropriately as needed. I updated the logging case to make sure it wouldn't modify the buffer position.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r552177542



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaRecordEntity extends ByteEntity
+{
+  private final ConsumerRecord<byte[], byte[]> record;
+
+  public KafkaRecordEntity(ConsumerRecord<byte[], byte[]> record)
+  {
+    super(record.value());

Review comment:
       note, today we skip records with null values, so we still need to address how to handle those.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r552341496



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
##########
@@ -119,15 +120,16 @@ public void seekToLatest(Set<StreamPartition<Integer>> partitions)
 
   @Nonnull
   @Override
-  public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
+  public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
   {
-    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
     for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
+
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(record.value())
+          ImmutableList.of(new KafkaRecordEntity(record))

Review comment:
       note, today we skip records with null values, so we should decide if/how we want to allow passing those through. I'm thinking we could add a config option if needed that would expose them as empty byte arrays for backwards compatibility in ByteEntity, but allow the full record in KafkaRecordEntity. However, I have not found many use-cases for null values, so we can decide to punt on this and add this option later if needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
gianm commented on pull request #10730:
URL: https://github.com/apache/druid/pull/10730#issuecomment-757000160


   > looks like tests are failing due to insufficient diff coverage for some lines. Many of them appear to be where we introduced additional type parameters. @gianm what's the guidance here?
   
   What I've done is either,
   
   - Add tests if it's something that really should be tested
   - Add a suppression if it's something that really shouldn't be tested
   - Ignore the coverage check if it's something that could be tested, but it isn't that important, and we want to skip it for now. (It's a diff-oriented check, so ignoring it won't affect future patches.)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553708247



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
-            );
+            if (log.isTraceEnabled()) {

Review comment:
       let's hope so, using ByteEntities directly should also avoid having to copy around byte arrays

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())

Review comment:
       good point




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553572213



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
##########
@@ -74,7 +74,7 @@ public SequenceOffsetType getSequenceNumber()
   }
 
   @NotNull
-  public List<byte[]> getData()
+  public List<? extends ByteEntity> getData()

Review comment:
       This change means we're returning a `ByteBuffer` instead of a `byte[]`, which opens up potential issues due to callers handling the position and limit wrong. What are the expectations? Is it ok for callers to change the position of the underlying buffer or should they refrain? If they do change it, does that cause problems?
   
   (For example: do we have situations where something calls `getData()` to parse a value, and that updates the position, and then later on `getData()` is called again for some other purpose like logging? If so — that'd break as a result of this change.)
   
   Whatever the case, the javadoc for this method should describe the conclusion.

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaRecordEntity extends ByteEntity

Review comment:
       Could you please add javadocs explaining how this class is meant to be used? Something like:
   
   > A ByteEntity generated by {@link KafkaRecordSupplier} and fed to any {@link InputFormat} used by a Kafka indexing tasks. It can be used as a regular ByteEntity, in which case the Kafka message value is returned. But the {@link #getRecord} method also allows Kafka-aware InputFormat implementations to read the full Kafka message, including headers, key, and timestamp.
   >
   > This functionality is not yet exposed through any built-in InputFormats, but is available for use in extensions.

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
##########
@@ -119,15 +120,16 @@ public void seekToLatest(Set<StreamPartition<Integer>> partitions)
 
   @Nonnull
   @Override
-  public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
+  public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
   {
-    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
     for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
+
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(record.value())
+          record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))

Review comment:
       We should put in a comment or javadoc noting that records with null values will be skipped, even if the other stuff (keys, timestamp, headers) are not null.
   
   Here's a suggestion:
   
   1. Add a note to the javadoc of KafkaRecordEntity mentioning that these entities won't be generated for messages with null values
   2. Add a null-value check to the constructor of KafkaRecordEntity enforcing that comment

##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -199,41 +199,41 @@
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
   {
     return ImmutableList.of(
-        new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")),

Review comment:
       Unless I missed it — could you add a test that uses a custom InputFormat and verifies that something other than the value can be read properly?

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())

Review comment:
       It's OK, because `fromUtf8` doesn't throw exceptions, it just replaces invalid sequences with the [replacement character](https://www.fileformat.info/info/unicode/char/fffd/index.htm). Although the log messages will look very weird.

##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
-            );
+            if (log.isTraceEnabled()) {

Review comment:
       Nice 😄
   
   I bet it will speed up Kinesis ingestion a bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553586787



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
##########
@@ -74,7 +74,7 @@ public SequenceOffsetType getSequenceNumber()
   }
 
   @NotNull
-  public List<byte[]> getData()
+  public List<? extends ByteEntity> getData()

Review comment:
       I did encounter some of this in test cases, where we would call getData on a record multiple times, causing issues. In general however, an InputEntity doesn't support being read multiple times, e.g. for `ByteEntity`, once it's been read via `InputEntity.open()`, the underlying buffer would have been exhausted and return an empty stream on subsequent calls to `open()`.
   
   Except for one logging case, all the calls to `OrderedPartitionableRecord.getData()` I could find would only be executed once per record, for the purpose of exposing InputEntities to parsers, which should ensure the buffers only get read once or treated appropriately for cases. I updated the logging case to make sure it wouldn't modify the buffer position.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on pull request #10730:
URL: https://github.com/apache/druid/pull/10730#issuecomment-756995841


   looks like tests are failing due to insufficient diff coverage for some lines. Many of them appear to be where we introduced additional type parameters. @gianm what's the guidance here?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553710889



##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -199,41 +199,41 @@
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
   {
     return ImmutableList.of(
-        new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")),

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r552177542



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaRecordEntity extends ByteEntity
+{
+  private final ConsumerRecord<byte[], byte[]> record;
+
+  public KafkaRecordEntity(ConsumerRecord<byte[], byte[]> record)
+  {
+    super(record.value());

Review comment:
       note, today we skip records with null values, so we still need to address how to handle those.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553486280



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
##########
@@ -119,15 +120,16 @@ public void seekToLatest(Set<StreamPartition<Integer>> partitions)
 
   @Nonnull
   @Override
-  public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
+  public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
   {
-    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
     for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
+
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(record.value())
+          record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))

Review comment:
       @himanshug I reverted this change to keep the existing behavior




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553074022



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaRecordEntity extends ByteEntity
+{
+  private final ConsumerRecord<byte[], byte[]> record;
+
+  public KafkaRecordEntity(ConsumerRecord<byte[], byte[]> record)
+  {
+    super(record.value());
+    this.record = record;
+  }
+
+  public ConsumerRecord<byte[], byte[]> getRecord()

Review comment:
       that's correct. We could have a sample InputFormat for documentation purposes, though I'm not sure if there is much value. A more generic input format for Kafka that wraps multiple input-formats would require a lot more thought.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
gianm commented on pull request #10730:
URL: https://github.com/apache/druid/pull/10730#issuecomment-757000160


   > looks like tests are failing due to insufficient diff coverage for some lines. Many of them appear to be where we introduced additional type parameters. @gianm what's the guidance here?
   
   What I've done is either,
   
   - Add tests if it's something that really should be tested
   - Add a suppression if it's something that really shouldn't be tested
   - Ignore the coverage check if it's something that could be tested, but it isn't that important, and we want to skip it for now. (It's a diff-oriented check, so ignoring it won't affect future patches.)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on pull request #10730:
URL: https://github.com/apache/druid/pull/10730#issuecomment-757055227


   ok I will merge it as is then. Adding specific tests for the trace logging seems a little overkill. We can add it later if we feel strongly about it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553074022



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaRecordEntity extends ByteEntity
+{
+  private final ConsumerRecord<byte[], byte[]> record;
+
+  public KafkaRecordEntity(ConsumerRecord<byte[], byte[]> record)
+  {
+    super(record.value());
+    this.record = record;
+  }
+
+  public ConsumerRecord<byte[], byte[]> getRecord()

Review comment:
       that's correct. We could have a sample InputFormat for documentation purposes, though I'm not sure if there is much value. A more generic input format for Kafka that wraps multiple input-formats would require a lot more thought, and I couldn't think of a one-size-fits-all approach that seemed very useful or didn't have additional complexities to deal with. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553708446



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
##########
@@ -119,15 +120,16 @@ public void seekToLatest(Set<StreamPartition<Integer>> partitions)
 
   @Nonnull
   @Override
-  public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
+  public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
   {
-    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
     for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
+
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(record.value())
+          ImmutableList.of(new KafkaRecordEntity(record))

Review comment:
       I kept reverted to the original behavior for now




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553515681



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())
-            );
+            if (log.isTraceEnabled()) {

Review comment:
       I added this check, to avoid unnecessarily deserializing all the bytes as strings




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on pull request #10730:
URL: https://github.com/apache/druid/pull/10730#issuecomment-756995841






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553074881



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
##########
@@ -55,7 +54,7 @@ public OrderedPartitionableRecord(
     this.stream = stream;
     this.partitionId = partitionId;
     this.sequenceNumber = sequenceNumber;
-    this.data = data == null ? ImmutableList.of() : data;
+    this.data = data;

Review comment:
       I reverted this change after seeing that we rely on null in a bunch of places.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553587776



##########
File path: extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -199,41 +199,41 @@
   private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic)
   {
     return ImmutableList.of(
-        new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")),
-        new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jbb("2008", "a", "y", "10", "20.0", "1.0")),

Review comment:
       makes sense, will add that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553516275



##########
File path: extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
##########
@@ -296,14 +279,22 @@ private Runnable fetchRecords()
             );
 
 
-            log.trace(
-                "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s",
-                currRecord.getStream(),
-                currRecord.getPartitionId(),
-                currRecord.getSequenceNumber(),
-                records.remainingCapacity(),
-                currRecord.getData().stream().map(StringUtils::fromUtf8).collect(Collectors.toList())

Review comment:
       I'm not even sure this is safe to do. Some byte arrays could technically be invalid UTF-8 sequences, which would lead to exceptions when trying to log values.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553707941



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
##########
@@ -119,15 +120,16 @@ public void seekToLatest(Set<StreamPartition<Integer>> partitions)
 
   @Nonnull
   @Override
-  public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
+  public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
   {
-    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
     for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
+
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(record.value())
+          record.value() == null ? null : ImmutableList.of(new KafkaRecordEntity(record))

Review comment:
       done, KafkaRecordEntity will already barf on null values when instantiating ByteEntity




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] gianm commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
gianm commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553572213



##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
##########
@@ -74,7 +74,7 @@ public SequenceOffsetType getSequenceNumber()
   }
 
   @NotNull
-  public List<byte[]> getData()
+  public List<? extends ByteEntity> getData()

Review comment:
       This change means we're returning a thing that wraps a `ByteBuffer` instead of a `byte[]`, which opens up potential issues due to callers handling the position and limit wrong. What are the expectations? Is it ok for callers to change the position of the underlying buffer or should they refrain? If they do change it, does that cause problems?
   
   (For example: do we have situations where something calls `getData()` to parse a value, and that updates the position, and then later on `getData()` is called again for some other purpose like logging? If so — that'd break as a result of this change.)
   
   Whatever the case, the javadoc for this method should describe the conclusion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl merged pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl merged pull request #10730:
URL: https://github.com/apache/druid/pull/10730


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] xvrl merged pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
xvrl merged pull request #10730:
URL: https://github.com/apache/druid/pull/10730


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] himanshug commented on a change in pull request #10730: Introduce KafkaRecordEntity to support Kafka headers in InputFormats

Posted by GitBox <gi...@apache.org>.
himanshug commented on a change in pull request #10730:
URL: https://github.com/apache/druid/pull/10730#discussion_r553063732



##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java
##########
@@ -119,15 +120,16 @@ public void seekToLatest(Set<StreamPartition<Integer>> partitions)
 
   @Nonnull
   @Override
-  public List<OrderedPartitionableRecord<Integer, Long>> poll(long timeout)
+  public List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> poll(long timeout)
   {
-    List<OrderedPartitionableRecord<Integer, Long>> polledRecords = new ArrayList<>();
+    List<OrderedPartitionableRecord<Integer, Long, KafkaRecordEntity>> polledRecords = new ArrayList<>();
     for (ConsumerRecord<byte[], byte[]> record : consumer.poll(Duration.ofMillis(timeout))) {
+
       polledRecords.add(new OrderedPartitionableRecord<>(
           record.topic(),
           record.partition(),
           record.offset(),
-          record.value() == null ? null : ImmutableList.of(record.value())
+          ImmutableList.of(new KafkaRecordEntity(record))

Review comment:
       I think it is fine as long as it doesn't cause an unhandled NPE somewhere downstream and just counts as `unparseable` record for existing pipelines.

##########
File path: extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafka/KafkaRecordEntity.java
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.kafka;
+
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+public class KafkaRecordEntity extends ByteEntity
+{
+  private final ConsumerRecord<byte[], byte[]> record;
+
+  public KafkaRecordEntity(ConsumerRecord<byte[], byte[]> record)
+  {
+    super(record.value());
+    this.record = record;
+  }
+
+  public ConsumerRecord<byte[], byte[]> getRecord()

Review comment:
       not used anywhere , is this only to allow writing extensions with custom InputFormat which can take advantage of the extra metadata available ?

##########
File path: indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/OrderedPartitionableRecord.java
##########
@@ -55,7 +54,7 @@ public OrderedPartitionableRecord(
     this.stream = stream;
     this.partitionId = partitionId;
     this.sequenceNumber = sequenceNumber;
-    this.data = data == null ? ImmutableList.of() : data;
+    this.data = data;

Review comment:
       why remove the null check and conversion to empty list ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org