You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/07/23 04:31:03 UTC
[pinot] branch master updated: Allow committing empty segment when some messages are fetched but all filtered out (#9089)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1211f07ed9 Allow committing empty segment when some messages are fetched but all filtered out (#9089)
1211f07ed9 is described below
commit 1211f07ed9602e130da0e83368fcc465519f0fcc
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 22 21:30:58 2022 -0700
Allow committing empty segment when some messages are fetched but all filtered out (#9089)
---
.../realtime/LLRealtimeSegmentDataManager.java | 10 ++++++----
.../realtime/LLRealtimeSegmentDataManagerTest.java | 23 ++++++++++------------
2 files changed, 16 insertions(+), 17 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index fdfe31377e..bc0e6a5932 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -243,6 +243,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// Segment end criteria
private volatile long _consumeEndTime = 0;
+ private volatile boolean _hasMessagesFetched = false;
private volatile boolean _endOfPartitionGroup = false;
private volatile boolean _forceCommitMessageReceived = false;
private StreamPartitionMsgOffset _finalOffset; // Used when we want to catch up to this one
@@ -302,7 +303,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
// - partition group is ended
// - force commit message has been received
if (now >= _consumeEndTime) {
- if (_realtimeSegment.getNumDocsIndexed() == 0) {
+ if (!_hasMessagesFetched) {
_segmentLogger.info("No events came in, extending time by {} hours", TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
_consumeEndTime += TimeUnit.HOURS.toMillis(TIME_EXTENSION_ON_EMPTY_SEGMENT_HOURS);
return false;
@@ -573,12 +574,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
streamMessageCount++;
}
updateCurrentDocumentCountMetrics();
- if (streamMessageCount != 0) {
- if (_segmentLogger.isDebugEnabled()) {
+ if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
+ _hasMessagesFetched = true;
+ if (streamMessageCount > 0 && _segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
indexedMessageCount, streamMessageCount, _currentOffset);
}
- } else if (messagesAndOffsets.getUnfilteredMessageCount() == 0) {
+ } else {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 1e8d70d331..50cf2a842f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -27,6 +27,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
@@ -41,7 +42,6 @@ import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
-import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.segment.local.segment.creator.Fixtures;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -512,12 +512,12 @@ public class LLRealtimeSegmentDataManagerTest {
FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
+ // We should still get false because there is no messages fetched
_timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1;
- // We should still get false, since the number of records in the realtime segment is 0
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
- replaceRealtimeSegment(segmentDataManager, 10);
- // Now we can test when we are far ahead in time
- _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS;
+ // Once there are messages fetched, and the time exceeds the extended hour, we should get true
+ setHasMessagesFetched(segmentDataManager, true);
+ _timeNow += TimeUnit.HOURS.toMillis(1);
Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
Assert.assertEquals(segmentDataManager.getStopReason(), SegmentCompletionProtocol.REASON_TIME_LIMIT);
segmentDataManager.destroy();
@@ -579,14 +579,11 @@ public class LLRealtimeSegmentDataManagerTest {
}
}
- // Replace the realtime segment with a mock that returns numDocs for raw doc count.
- private void replaceRealtimeSegment(FakeLLRealtimeSegmentDataManager segmentDataManager, int numDocs)
+ private void setHasMessagesFetched(FakeLLRealtimeSegmentDataManager segmentDataManager, boolean hasMessagesFetched)
throws Exception {
- MutableSegmentImpl mockSegmentImpl = mock(MutableSegmentImpl.class);
- when(mockSegmentImpl.getNumDocsIndexed()).thenReturn(numDocs);
- Field segmentImpl = LLRealtimeSegmentDataManager.class.getDeclaredField("_realtimeSegment");
- segmentImpl.setAccessible(true);
- segmentImpl.set(segmentDataManager, mockSegmentImpl);
+ Field field = LLRealtimeSegmentDataManager.class.getDeclaredField("_hasMessagesFetched");
+ field.setAccessible(true);
+ field.set(segmentDataManager, hasMessagesFetched);
}
// If commit fails, make sure that we do not re-build the segment when we try to commit again.
@@ -743,7 +740,7 @@ public class LLRealtimeSegmentDataManagerTest {
public Field _state;
public Field _shouldStop;
public Field _stopReason;
- private Field _streamMsgOffsetFactory;
+ private final Field _streamMsgOffsetFactory;
public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>();
public boolean _commitSegmentCalled = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org