You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/07/29 00:28:41 UTC

[pinot] branch master updated: Handle case when segment time threshold is crossed and 0 batch messages are processed (#9093)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4084b6031b Handle case when segment time threshold is crossed and 0 batch messages are processed (#9093)
4084b6031b is described below

commit 4084b6031b8313cc77b515806485510f0351b8e7
Author: Daniel del Castillo <dd...@users.noreply.github.com>
AuthorDate: Fri Jul 29 01:28:36 2022 +0100

    Handle case when segment time threshold is crossed and 0 batch messages are processed (#9093)
---
 .../realtime/LLRealtimeSegmentDataManager.java     |  52 ++++--
 .../realtime/LLRealtimeSegmentDataManagerTest.java | 184 ++++++++++++++++++---
 .../fakestream/FakePartitionLevelConsumer.java     |   9 +-
 .../impl/fakestream/FakeStreamConfigUtils.java     |   3 +-
 .../impl/fakestream/FakeStreamConsumerFactory.java |   2 +-
 .../impl/fakestream/FakeStreamMessageBatch.java    |   6 +
 .../impl/fakestream/FakeStreamMessageDecoder.java  |   7 +-
 .../stream/kafka09/SimpleConsumerMessageBatch.java |   5 +
 .../segment/local/segment/creator/Fixtures.java    |   8 +-
 9 files changed, 228 insertions(+), 48 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index bc0e6a5932..6e62e6665e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -427,7 +427,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         continue;
       }
 
-      processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
+      boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
 
       if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
         idle = false;
@@ -440,6 +440,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         }
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
         lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
+      } else if (endCriteriaReached) {
+        // At this point current offset has not moved because processStreamEvents() has exited before processing a
+        // single message
+        if (_segmentLogger.isDebugEnabled()) {
+          _segmentLogger.debug("No messages processed before end criteria was reached. Staying at offset {}",
+              _currentOffset);
+        }
+        // We check this flag again further down
       } else if (messageBatch.getUnfilteredMessageCount() > 0) {
         idle = false;
         // we consumed something from the stream but filtered all the content out,
@@ -453,11 +461,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       } else {
         // We did not consume any rows.
         if (!idle) {
-          idleStartTimeMillis = System.currentTimeMillis();
+          idleStartTimeMillis = now();
           idle = true;
         }
         if (idleTimeoutMillis >= 0) {
-          long totalIdleTimeMillis = System.currentTimeMillis() - idleStartTimeMillis;
+          long totalIdleTimeMillis = now() - idleStartTimeMillis;
           if (totalIdleTimeMillis > idleTimeoutMillis) {
             // Update the partition-consuming metric only if we have been idling beyond idle timeout.
             // Create a new stream consumer wrapper, in case we are stuck on something.
@@ -469,6 +477,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           }
         }
       }
+
+      if (endCriteriaReached) {
+        // check this flag to avoid calling endCriteriaReached() at the beginning of the loop
+        break;
+      }
     }
 
     if (_numRowsErrored > 0) {
@@ -478,7 +491,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     return true;
   }
 
-  private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeSleepTimeMillis) {
+  /**
+   * @param messagesAndOffsets batch of messages to process
+   * @param idlePipeSleepTimeMillis wait time in case no messages were read
+   * @return returns <code>true</code> if the process loop ended before processing the batch, <code>false</code>
+   * otherwise
+   */
+  private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeSleepTimeMillis) {
 
     int messageCount = messagesAndOffsets.getMessageCount();
     _rateLimiter.throttle(messageCount);
@@ -492,8 +511,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
     GenericRow reuse = new GenericRow();
     TransformPipeline.Result reusedResult = new TransformPipeline.Result();
+    boolean prematureExit = false;
     for (int index = 0; index < messageCount; index++) {
-      if (_shouldStop || endCriteriaReached()) {
+      prematureExit = _shouldStop || endCriteriaReached();
+      if (prematureExit) {
         if (_segmentLogger.isDebugEnabled()) {
           _segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop);
         }
@@ -540,7 +561,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
           String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
           _segmentLogger.error(errorMessage, e);
           _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-              new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+              new SegmentErrorInfo(now(), errorMessage, e));
         }
         if (reusedResult.getSkippedRowCount() > 0) {
           realtimeRowsDroppedMeter =
@@ -559,7 +580,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             String errorMessage = String.format("Caught exception while indexing the record: %s", transformedRow);
             _segmentLogger.error(errorMessage, e);
             _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-                new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+                new SegmentErrorInfo(now(), errorMessage, e));
           }
         }
       } else {
@@ -580,13 +601,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
             indexedMessageCount, streamMessageCount, _currentOffset);
       }
-    } else {
+    } else if (!prematureExit) {
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
       }
       // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
       Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS);
     }
+    return prematureExit;
   }
 
   public class PartitionConsumer implements Runnable {
@@ -705,7 +727,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         postStopConsumedMsg(e.getClass().getName());
         _state = State.ERROR;
         _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
         _serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
         return;
       }
@@ -853,7 +875,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         _segmentLogger.error("Could not build segment", e);
         FileUtils.deleteQuietly(tempSegmentFolder);
         _realtimeTableDataManager.addSegmentError(_segmentNameStr,
-            new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", e));
+            new SegmentErrorInfo(now(), "Could not build segment", e));
         return null;
       }
       final long buildTimeMillis = now() - lockAcquireTimeMillis;
@@ -875,7 +897,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
             String.format("Caught exception while moving index directory from: %s to: %s", tempIndexDir, indexDir);
         _segmentLogger.error(errorMessage, e);
         _realtimeTableDataManager
-            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+            .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
         return null;
       } finally {
         FileUtils.deleteQuietly(tempSegmentFolder);
@@ -896,7 +918,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               String.format("Caught exception while taring index directory from: %s to: %s", indexDir, segmentTarFile);
           _segmentLogger.error(errorMessage, e);
           _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
           return null;
         }
 
@@ -906,7 +928,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               V1Constants.MetadataKeys.METADATA_FILE_NAME, indexDir);
           _segmentLogger.error(errorMessage);
           _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, null));
+              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
           return null;
         }
         File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -915,7 +937,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
               V1Constants.SEGMENT_CREATION_META, indexDir);
           _segmentLogger.error(errorMessage);
           _realtimeTableDataManager
-              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, null));
+              .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
           return null;
         }
         Map<String, File> metadataFiles = new HashMap<>();
@@ -932,7 +954,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       String errorMessage = "Interrupted while waiting for semaphore";
       _segmentLogger.error(errorMessage, e);
       _realtimeTableDataManager
-          .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+          .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
       return null;
     } finally {
       if (_segBuildSemaphore != null) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 50cf2a842f..ac8964bb32 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -28,7 +28,10 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -38,6 +41,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -53,6 +57,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.util.TestUtils;
@@ -73,16 +78,15 @@ public class LLRealtimeSegmentDataManagerTest {
   private static final String SEGMENT_DIR = "/tmp/" + LLRealtimeSegmentDataManagerTest.class.getSimpleName();
   private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR);
   private static final String TABLE_NAME = "Coffee";
-  private static final int PARTITION_GROUP_ID = 13;
+  private static final int PARTITION_GROUP_ID = 0;
   private static final int SEQUENCE_ID = 945;
   private static final long SEG_TIME_MS = 98347869999L;
   private static final LLCSegmentName SEGMENT_NAME =
       new LLCSegmentName(TABLE_NAME, PARTITION_GROUP_ID, SEQUENCE_ID, SEG_TIME_MS);
   private static final String SEGMENT_NAME_STR = SEGMENT_NAME.getSegmentName();
-  private static final long START_OFFSET_VALUE = 19885L;
+  private static final long START_OFFSET_VALUE = 198L;
   private static final LongMsgOffset START_OFFSET = new LongMsgOffset(START_OFFSET_VALUE);
 
-  private static long _timeNow = System.currentTimeMillis();
   private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>();
 
   private static TableConfig createTableConfig()
@@ -114,15 +118,32 @@ public class LLRealtimeSegmentDataManagerTest {
 
   private FakeLLRealtimeSegmentDataManager createFakeSegmentManager()
       throws Exception {
+    return createFakeSegmentManager(false, new TimeSupplier(), null, null);
+  }
+
+  private FakeLLRealtimeSegmentDataManager createFakeSegmentManager(boolean noUpsert, TimeSupplier timeSupplier,
+      @Nullable String maxRows, @Nullable String maxDuration)
+      throws Exception {
     SegmentZKMetadata segmentZKMetadata = createZkMetadata();
     TableConfig tableConfig = createTableConfig();
+    if (noUpsert) {
+      tableConfig.setUpsertConfig(null);
+    }
+    if (maxRows != null) {
+      tableConfig.getIndexingConfig().getStreamConfigs()
+          .put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, maxRows);
+    }
+    if (maxDuration != null) {
+      tableConfig.getIndexingConfig().getStreamConfigs()
+          .put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, maxDuration);
+    }
     RealtimeTableDataManager tableDataManager = createTableDataManager(tableConfig);
     LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
     _partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new Semaphore(1));
     Schema schema = Fixtures.createSchema();
     ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
     return new FakeLLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, tableDataManager, SEGMENT_DIR, schema,
-        llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics);
+        llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier);
   }
 
   @BeforeClass
@@ -143,7 +164,7 @@ public class LLRealtimeSegmentDataManagerTest {
     final String offset = "34";
     FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
     {
-      //  Controller sends catchup response with both offset as well as streamPartitionMsgOffset
+      //  Controller sends catchup response with both offset and streamPartitionMsgOffset
       String responseStr =
           "{" + "  \"streamPartitionMsgOffset\" : \"" + offset + "\"," + "  \"offset\" : " + offset + ","
               + "  \"buildTimeSec\" : -1," + "  \"isSplitCommitType\" : false,"
@@ -513,11 +534,11 @@ public class LLRealtimeSegmentDataManagerTest {
       segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
       // We should still get false because there is no messages fetched
-      _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1;
+      segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
       // Once there are messages fetched, and the time exceeds the extended hour, we should get true
       setHasMessagesFetched(segmentDataManager, true);
-      _timeNow += TimeUnit.HOURS.toMillis(1);
+      segmentDataManager._timeSupplier.add(TimeUnit.HOURS.toMillis(1));
       Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
       Assert.assertEquals(segmentDataManager.getStopReason(), SegmentCompletionProtocol.REASON_TIME_LIMIT);
       segmentDataManager.destroy();
@@ -537,7 +558,7 @@ public class LLRealtimeSegmentDataManagerTest {
     // In catching up state, test reaching final offset ignoring time
     {
       FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
-      _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS;
+      segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS);
       segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP);
       final long finalOffset = START_OFFSET_VALUE + 100;
       segmentDataManager.setFinalOffset(finalOffset);
@@ -551,9 +572,9 @@ public class LLRealtimeSegmentDataManagerTest {
     // Case 1. We have reached final offset.
     {
       FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
-      _timeNow += 1;
+      segmentDataManager._timeSupplier.add(1);
       segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
-      segmentDataManager.setConsumeEndTime(_timeNow + 10);
+      segmentDataManager.setConsumeEndTime(segmentDataManager._timeSupplier.get() + 10);
       final long finalOffset = START_OFFSET_VALUE + 100;
       segmentDataManager.setFinalOffset(finalOffset);
       segmentDataManager.setCurrentOffset(finalOffset - 1);
@@ -566,14 +587,14 @@ public class LLRealtimeSegmentDataManagerTest {
     {
       FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
       segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
-      final long endTime = _timeNow + 10;
+      final long endTime = segmentDataManager._timeSupplier.get() + 10;
       segmentDataManager.setConsumeEndTime(endTime);
       final long finalOffset = START_OFFSET_VALUE + 100;
       segmentDataManager.setFinalOffset(finalOffset);
       segmentDataManager.setCurrentOffset(finalOffset - 1);
-      _timeNow = endTime - 1;
+      segmentDataManager._timeSupplier.set(endTime - 1);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
-      _timeNow = endTime;
+      segmentDataManager._timeSupplier.set(endTime);
       Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
       segmentDataManager.destroy();
     }
@@ -735,12 +756,118 @@ public class LLRealtimeSegmentDataManagerTest {
     Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
   }
 
+  @Test
+  public void testShouldNotSkipUnfilteredMessagesIfNotIndexedAndTimeThresholdIsReached()
+      throws Exception {
+    final int segmentTimeThresholdMins = 10;
+    TimeSupplier timeSupplier = new TimeSupplier() {
+      @Override
+      public Long get() {
+        long now = System.currentTimeMillis();
+        // now() is called once in the run() method, once before each batch reading and once for every row indexation
+        if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 4) {
+          return now;
+        }
+        // Exceed segment time threshold
+        return now + TimeUnit.MINUTES.toMillis(segmentTimeThresholdMins + 1);
+      }
+    };
+    FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(true, timeSupplier,
+        String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 2), segmentTimeThresholdMins + "m");
+    segmentDataManager._stubConsumeLoop = false;
+    segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+    LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer();
+    final LongMsgOffset endOffset =
+        new LongMsgOffset(START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    segmentDataManager._consumeOffsets.add(endOffset);
+    final SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(
+        new SegmentCompletionProtocol.Response.Params().withStatus(
+                SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+            .withStreamPartitionMsgOffset(endOffset.toString()));
+    segmentDataManager._responses.add(response);
+
+    consumer.run();
+
+    try {
+      // millis() is called first in run before consumption, then once for each batch and once for each message in
+      // the batch, then once more when metrics are updated after each batch is processed and then 4 more times in
+      // run() after consume loop
+      Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 8);
+      Assert.assertEquals(((LongMsgOffset) segmentDataManager.getCurrentOffset()).getOffset(),
+          START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+      Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+          FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+      Assert.assertEquals(segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs(),
+          FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    } finally {
+      segmentDataManager.destroy();
+    }
+  }
+
+  @Test
+  public void testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdIsReached()
+      throws Exception {
+    final int segmentTimeThresholdMins = 10;
+    TimeSupplier timeSupplier = new TimeSupplier();
+    FakeLLRealtimeSegmentDataManager segmentDataManager =
+        createFakeSegmentManager(true, timeSupplier, String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS),
+            segmentTimeThresholdMins + "m");
+    segmentDataManager._stubConsumeLoop = false;
+    segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+    LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer();
+    final LongMsgOffset endOffset =
+        new LongMsgOffset(START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    segmentDataManager._consumeOffsets.add(endOffset);
+    final SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(
+        new SegmentCompletionProtocol.Response.Params().withStatus(
+                SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+            .withStreamPartitionMsgOffset(endOffset.toString()));
+    segmentDataManager._responses.add(response);
+
+    consumer.run();
+
+    try {
+      // millis() is called first in run before consumption, then once for each batch and once for each message in
+      // the batch, then once for metrics updates and then 4 more times in run() after consume loop
+      Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 6);
+      Assert.assertEquals(((LongMsgOffset) segmentDataManager.getCurrentOffset()).getOffset(),
+          START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+      Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+          FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+      Assert.assertEquals(segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs(),
+          FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    } finally {
+      segmentDataManager.destroy();
+    }
+  }
+
+  private static class TimeSupplier implements Supplier<Long> {
+    protected final AtomicInteger _timeCheckCounter = new AtomicInteger();
+    protected long _timeNow = System.currentTimeMillis();
+
+    @Override
+    public Long get() {
+      _timeCheckCounter.incrementAndGet();
+      return _timeNow;
+    }
+
+    public void set(long millis) {
+      _timeNow = millis;
+    }
+
+    public void add(long millis) {
+      _timeNow += millis;
+    }
+  }
+
   public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
 
     public Field _state;
     public Field _shouldStop;
     public Field _stopReason;
-    private final Field _streamMsgOffsetFactory;
+    private Field _streamMsgOffsetFactory;
     public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
     public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>();
     public boolean _commitSegmentCalled = false;
@@ -752,6 +879,8 @@ public class LLRealtimeSegmentDataManagerTest {
     public boolean _throwExceptionFromConsume = false;
     public boolean _postConsumeStoppedCalled = false;
     public Map<Integer, Semaphore> _semaphoreMap;
+    public boolean _stubConsumeLoop = true;
+    private TimeSupplier _timeSupplier;
 
     private static InstanceDataManagerConfig makeInstanceDataManagerConfig() {
       InstanceDataManagerConfig dataManagerConfig = mock(InstanceDataManagerConfig.class);
@@ -766,7 +895,8 @@ public class LLRealtimeSegmentDataManagerTest {
 
     public FakeLLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
         RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, Schema schema,
-        LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, ServerMetrics serverMetrics)
+        LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, ServerMetrics serverMetrics,
+        TimeSupplier timeSupplier)
         throws Exception {
       super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir,
           new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName,
@@ -781,6 +911,7 @@ public class LLRealtimeSegmentDataManagerTest {
       _streamMsgOffsetFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
       _streamMsgOffsetFactory.setAccessible(true);
       _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
+      _timeSupplier = timeSupplier;
     }
 
     public String getStopReason() {
@@ -826,12 +957,15 @@ public class LLRealtimeSegmentDataManagerTest {
     @Override
     protected boolean consumeLoop()
         throws Exception {
-      if (_throwExceptionFromConsume) {
-        throw new PermanentConsumerException(new Throwable("Offset out of range"));
+      if (_stubConsumeLoop) {
+        if (_throwExceptionFromConsume) {
+          throw new PermanentConsumerException(new Throwable("Offset out of range"));
+        }
+        setCurrentOffset(_consumeOffsets.remove().getOffset());
+        terminateLoopIfNecessary();
+        return true;
       }
-      setCurrentOffset(_consumeOffsets.remove().getOffset());
-      terminateLoopIfNecessary();
-      return true;
+      return super.consumeLoop();
     }
 
     @Override
@@ -854,12 +988,16 @@ public class LLRealtimeSegmentDataManagerTest {
 
     @Override
     protected long now() {
-      return _timeNow;
+      // now() is called in the constructor before _timeSupplier is set
+      if (_timeSupplier == null) {
+        return System.currentTimeMillis();
+      }
+      return _timeSupplier.get();
     }
 
     @Override
     protected void hold() {
-      _timeNow += 5000L;
+      _timeSupplier.add(5000L);
     }
 
     @Override
@@ -899,7 +1037,7 @@ public class LLRealtimeSegmentDataManagerTest {
 
     @Override
     public void stop() {
-      _timeNow += _stopWaitTimeMs;
+      _timeSupplier.add(_stopWaitTimeMs);
     }
 
     public void setCurrentOffset(long offset) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
index 7f2e603231..cbb77066f2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
@@ -53,9 +53,10 @@ public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
 
   private List<Integer> _messageOffsets = new ArrayList<>();
   private List<byte[]> _messageBytes = new ArrayList<>();
+  private final int _defaultBatchSize;
 
-  FakePartitionLevelConsumer(int partition, StreamConfig streamConfig) {
-
+  FakePartitionLevelConsumer(int partition, StreamConfig streamConfig, int defaultBatchSize) {
+    _defaultBatchSize = defaultBatchSize;
     // TODO: this logic can move to a FakeStreamProducer instead of being inside the Consumer
     File tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
     File outputDir = new File(tempDir, String.valueOf(partition));
@@ -114,6 +115,10 @@ public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
     }
     int startOffsetInt = (int) ((LongMsgOffset) startOffset).getOffset();
     int endOffsetInt = (int) ((LongMsgOffset) endOffset).getOffset();
+    if (endOffsetInt > _messageOffsets.size() && _defaultBatchSize > 0) {
+      // Hack to get multiple batches
+      endOffsetInt = startOffsetInt + _defaultBatchSize;
+    }
     return new FakeStreamMessageBatch(_messageOffsets.subList(startOffsetInt, endOffsetInt),
         _messageBytes.subList(startOffsetInt, endOffsetInt));
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index d84261cd44..75de0fbee9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -63,7 +63,8 @@ public class FakeStreamConfigUtils {
   private static final String CONSUMER_FACTORY_CLASS = FakeStreamConsumerFactory.class.getName();
   private static final String OFFSET_CRITERIA = "smallest";
   private static final String DECODER_CLASS = FakeStreamMessageDecoder.class.getName();
-  private static final int SEGMENT_FLUSH_THRESHOLD_ROWS = 500;
+  public static final int SEGMENT_FLUSH_THRESHOLD_ROWS = 500;
+  public static final int MESSAGE_BATCH_SIZE = SEGMENT_FLUSH_THRESHOLD_ROWS;
 
   /**
    * Gets default num partitions
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index e4c5301215..1923a4d822 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -47,7 +47,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
 
   @Override
   public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
-    return new FakePartitionLevelConsumer(partition, _streamConfig);
+    return new FakePartitionLevelConsumer(partition, _streamConfig, FakeStreamConfigUtils.MESSAGE_BATCH_SIZE);
   }
 
   @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
index 8348435f1a..134aac9fd8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
@@ -36,26 +36,32 @@ public class FakeStreamMessageBatch implements MessageBatch<byte[]> {
     _messageBytes = messageBytes;
   }
 
+  @Override
   public int getMessageCount() {
     return _messageOffsets.size();
   }
 
+  @Override
   public byte[] getMessageAtIndex(int index) {
     return _messageBytes.get(index);
   }
 
+  @Override
   public int getMessageOffsetAtIndex(int index) {
     return _messageOffsets.get(index);
   }
 
+  @Override
   public int getMessageLengthAtIndex(int index) {
     return _messageBytes.get(index).length;
   }
 
+  @Override
   public long getNextStreamMessageOffsetAtIndex(int index) {
     throw new UnsupportedOperationException("This method is deprecated");
   }
 
+  @Override
   public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) {
     return new LongMsgOffset(_messageOffsets.get(index) + 1);
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
index 10f358fb8a..07eacfbf3e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.realtime.impl.fakestream;
 
 import java.util.Map;
 import java.util.Set;
+import org.apache.pinot.segment.local.segment.creator.Fixtures;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 
@@ -35,11 +36,13 @@ public class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
 
   @Override
   public GenericRow decode(byte[] payload, GenericRow destination) {
-    return null;
+    GenericRow row = Fixtures.createSingleRow(System.currentTimeMillis());
+    destination.init(row);
+    return destination;
   }
 
   @Override
   public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
-    return null;
+    return decode(payload, destination);
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
index b53546c1b8..5e36a22d09 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
@@ -35,18 +35,22 @@ public class SimpleConsumerMessageBatch implements MessageBatch<byte[]> {
     }
   }
 
+  @Override
   public int getMessageCount() {
     return _messageList.size();
   }
 
+  @Override
   public byte[] getMessageAtIndex(int index) {
     return _messageList.get(index).message().payload().array();
   }
 
+  @Override
   public int getMessageOffsetAtIndex(int index) {
     return _messageList.get(index).message().payload().arrayOffset();
   }
 
+  @Override
   public int getMessageLengthAtIndex(int index) {
     return _messageList.get(index).message().payloadSize();
   }
@@ -56,6 +60,7 @@ public class SimpleConsumerMessageBatch implements MessageBatch<byte[]> {
     throw new UnsupportedOperationException("This method is deprecated");
   }
 
+  @Override
   public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) {
     return new LongMsgOffset(_messageList.get(index).nextOffset());
   }
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java
index 934c097b1a..3b1f9ba22b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java
@@ -109,7 +109,7 @@ public class Fixtures {
     return Schema.fromString(SCHEMA_JSON);
   }
 
-  public static GenericRow createSingleRow(int randomSeed) {
+  public static GenericRow createSingleRow(long randomSeed) {
     Random rand = new Random(randomSeed);
     int colValue = rand.nextInt(Integer.MAX_VALUE);
     GenericRow retVal = new GenericRow();
@@ -120,7 +120,7 @@ public class Fixtures {
     return retVal;
   }
 
-  public static GenericRow createInvalidSingleRow(int randomSeed) {
+  public static GenericRow createInvalidSingleRow(long randomSeed) {
     Random rand = new Random(randomSeed);
     int colValue = rand.nextInt(Integer.MAX_VALUE);
     GenericRow retVal = new GenericRow();
@@ -131,7 +131,7 @@ public class Fixtures {
     return retVal;
   }
 
-  public static GenericRow createMultipleRow(int randomSeed) {
+  public static GenericRow createMultipleRow(long randomSeed) {
     Random rand = new Random(randomSeed);
     GenericRow firstRow = createSingleRow(randomSeed);
     GenericRow secondRow = createSingleRow(rand.nextInt(Integer.MAX_VALUE));
@@ -141,7 +141,7 @@ public class Fixtures {
     return retVal;
   }
 
-  public static GenericRow createMultipleRowPartialFailure(int randomSeed) {
+  public static GenericRow createMultipleRowPartialFailure(long randomSeed) {
     Random rand = new Random(randomSeed);
     GenericRow firstRow = createSingleRow(randomSeed);
     GenericRow secondRow = createInvalidSingleRow(rand.nextInt(Integer.MAX_VALUE));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org