You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/12/21 02:40:58 UTC

[GitHub] [pinot] sajjad-moradi commented on a change in pull request #7927: Fix realtime ingestion when an entire batch of messages is filtered out

sajjad-moradi commented on a change in pull request #7927:
URL: https://github.com/apache/pinot/pull/7927#discussion_r772769012



##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaMessageBatch.java
##########
@@ -30,19 +27,26 @@
 
 public class KafkaMessageBatch implements MessageBatch<byte[]> {
 
-  private List<MessageAndOffset> _messageList = new ArrayList<>();
+  private final List<MessageAndOffset> _messageList;
+  private final int _unfilteredMessageCount;
+  private final long _lastOffset;
 
-  public KafkaMessageBatch(Iterable<ConsumerRecord<String, Bytes>> iterable) {
-    for (ConsumerRecord<String, Bytes> record : iterable) {
-      _messageList.add(new MessageAndOffset(record.value().get(), record.offset()));
-    }
+  public KafkaMessageBatch(int unfilteredMessageCount, long lastOffset, List<MessageAndOffset> batch) {

Review comment:
       Please add javadoc for the arguments and explain why they are needed.

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/stream/MessageBatch.java
##########
@@ -82,6 +88,13 @@ default StreamPartitionMsgOffset getNextStreamParitionMsgOffsetAtIndex(int index
     return new LongMsgOffset(getNextStreamMessageOffsetAtIndex(index));
   }
 
+  /**
+   * @return last offset in the batch
+   */
+  default StreamPartitionMsgOffset getLastOffset() {
+    return getNextStreamParitionMsgOffsetAtIndex(getMessageCount() - 1);

Review comment:
       `getMessageCount() - 1` is not a valid index. Since this method is overridden for kafka 2.0 and the other streams are not supposed to use this method, at least for now, I believe we should throw IllegalStateException here.

##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
##########
@@ -18,61 +18,53 @@
  */
 package org.apache.pinot.plugin.stream.kafka20;
 
-import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.TimeoutException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionLevelConsumer;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler
     implements PartitionLevelConsumer {
-  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);
 
   public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) {
     super(clientId, streamConfig, partition);
   }
 
   @Override
-  public MessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, StreamPartitionMsgOffset endMsgOffset,
-      int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+      StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch fetchMessages(long startOffset, long endOffset, int timeoutMillis)
-      throws TimeoutException {
+  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
-    final Iterable<ConsumerRecord<String, Bytes>> messageAndOffsetIterable =
-        buildOffsetFilteringIterable(consumerRecords.records(_topicPartition), startOffset, endOffset);
-    return new KafkaMessageBatch(messageAndOffsetIterable);
-  }
-
-  private Iterable<ConsumerRecord<String, Bytes>> buildOffsetFilteringIterable(
-      final List<ConsumerRecord<String, Bytes>> messageAndOffsets, final long startOffset, final long endOffset) {
-    return Iterables.filter(messageAndOffsets, input -> {
-      // Filter messages that are either null or have an offset ∉ [startOffset, endOffset]
-      return input != null && input.value() != null && input.offset() >= startOffset && (endOffset > input.offset()
-          || endOffset == -1);
-    });
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    super.close();
+    List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
+    List<MessageAndOffset> filtered = new ArrayList<>(messageAndOffsets.size());
+    long lastOffset = startOffset;
+    for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      if (messageAndOffset != null) {
+        Bytes message = messageAndOffset.value();
+        long offset = messageAndOffset.offset();
+        if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
+          if (message != null) {
+            filtered.add(new MessageAndOffset(message.get(), offset));
+          }
+          lastOffset = offset;
+        }
+      }
+    }
+    return new KafkaMessageBatch(messageAndOffsets.size(), lastOffset, filtered);

Review comment:
       `lastOffset` is used only in `getOffsetOfNextBatch` method. I suggest we rename it to offsetOfNextBatch here and also in KafkaMessageBatch class.
   Also, last offset is being used only when all messages are filtered out. So basically offsetOfNextBatch, in that case, will always be startOffset + 1, which is inefficient. If there are N messages in the batch, we'll fetch N-1 null messages again. We can use `startOffset + N` as the value for offsetOfNextBatch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org