You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/07/23 04:31:03 UTC

[pinot] branch master updated: Allow committing empty segment when some messages are fetched but all filtered out (#9089)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1211f07ed9 Allow committing empty segment when some messages are fetched but all filtered out (#9089)
1211f07ed9 is described below

commit 1211f07ed9602e130da0e83368fcc465519f0fcc
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 22 21:30:58 2022 -0700

    Allow committing empty segment when some messages are fetched but all filtered out (#9089)
---
 .../realtime/LLRealtimeSegmentDataManager.java     | 10 ++++++----
 .../realtime/LLRealtimeSegmentDataManagerTest.java | 23 ++++++++++------------
 2 files changed, 16 insertions(+), 17 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fdfe31377e..bc0e6a5932 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -243,6 +243,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
 
   // Segment end criteria
   private volatile long _consumeEndTime = 0;
+  private volatile boolean _hasMessagesFetched = false;
   private volatile boolean _endOfPartitionGroup = false;
   private volatile boolean _forceCommitMessageReceived = false;
   private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one
@@ -302,7 +303,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
         //   - partition group is ended
         //   - force commit message has been received
         if (now >= _consumeEndTime) {
-          if (_realtimeSegment.getNumDocsIndexed() == 0) {
+          if (!_hasMessagesFetched) {
             _segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             _consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
             return false;
@@ -573,12 +574,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
       streamMessageCount++;
     }
     updateCurrentDocumentCountMetrics();
-    if (streamMessageCount != 0) {
-      if (_segmentLogger.isDebugEnabled()) {
+    if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
+      _hasMessagesFetched = true;
+      if (streamMessageCount > 0 && _segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
             indexedMessageCount, streamMessageCount, _currentOffset);
       }
-    } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
+    } else {
       if (_segmentLogger.isDebugEnabled()) {
         _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
       }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 1e8d70d331..50cf2a842f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
@@ -41,7 +42,6 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
 import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.segment.local.segment.creator.Fixtures;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -512,12 +512,12 @@ public class LLRealtimeSegmentDataManagerTest {
       FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
       segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
+      // We should still get false because there is no messages fetched
       _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1;
-      // We should still get false, since the number of records in the realtime segment is 0
       Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
-      replaceRealtimeSegment(segmentDataManager, 10);
-      // Now we can test when we are far ahead in time
-      _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS;
+      // Once there are messages fetched, and the time exceeds the extended hour, we should get true
+      setHasMessagesFetched(segmentDataManager, true);
+      _timeNow += TimeUnit.HOURS.toMillis(1);
       Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
       Assert.assertEquals(segmentDataManager.getStopReason(), SegmentCompletionProtocol.REASON_TIME_LIMIT);
       segmentDataManager.destroy();
@@ -579,14 +579,11 @@ public class LLRealtimeSegmentDataManagerTest {
     }
   }
 
-  // Replace the realtime segment with a mock that returns numDocs for raw doc count.
-  private void replaceRealtimeSegment(FakeLLRealtimeSegmentDataManager segmentDataManager, int numDocs)
+  private void setHasMessagesFetched(FakeLLRealtimeSegmentDataManager segmentDataManager, boolean hasMessagesFetched)
       throws Exception {
-    MutableSegmentImpl mockSegmentImpl = mock(MutableSegmentImpl.class);
-    when(mockSegmentImpl.getNumDocsIndexed()).thenReturn(numDocs);
-    Field segmentImpl = LLRealtimeSegmentDataManager.class.getDeclaredField("_realtimeSegment");
-    segmentImpl.setAccessible(true);
-    segmentImpl.set(segmentDataManager, mockSegmentImpl);
+    Field field = LLRealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched");
+    field.setAccessible(true);
+    field.set(segmentDataManager, hasMessagesFetched);
   }
 
   // If commit fails, make sure that we do not re-build the segment when we try to commit again.
@@ -743,7 +740,7 @@ public class LLRealtimeSegmentDataManagerTest {
     public Field _state;
     public Field _shouldStop;
     public Field _stopReason;
-    private Field _streamMsgOffsetFactory;
+    private final Field _streamMsgOffsetFactory;
     public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
     public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>();
     public boolean _commitSegmentCalled = false;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org