You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/07/29 00:28:41 UTC
[pinot] branch master updated: Handle case when segment time threshold is crossed and 0 batch messages are processed (#9093)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4084b6031b Handle case when segment time threshold is crossed and 0 batch messages are processed (#9093)
4084b6031b is described below
commit 4084b6031b8313cc77b515806485510f0351b8e7
Author: Daniel del Castillo <dd...@users.noreply.github.com>
AuthorDate: Fri Jul 29 01:28:36 2022 +0100
Handle case when segment time threshold is crossed and 0 batch messages are processed (#9093)
---
.../realtime/LLRealtimeSegmentDataManager.java | 52 ++++--
.../realtime/LLRealtimeSegmentDataManagerTest.java | 184 ++++++++++++++++++---
.../fakestream/FakePartitionLevelConsumer.java | 9 +-
.../impl/fakestream/FakeStreamConfigUtils.java | 3 +-
.../impl/fakestream/FakeStreamConsumerFactory.java | 2 +-
.../impl/fakestream/FakeStreamMessageBatch.java | 6 +
.../impl/fakestream/FakeStreamMessageDecoder.java | 7 +-
.../stream/kafka09/SimpleConsumerMessageBatch.java | 5 +
.../segment/local/segment/creator/Fixtures.java | 8 +-
9 files changed, 228 insertions(+), 48 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index bc0e6a5932..6e62e6665e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -427,7 +427,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
continue;
}
- processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
+ boolean endCriteriaReached = processStreamEvents(messageBatch, idlePipeSleepTimeMillis);
if (_currentOffset.compareTo(lastUpdatedOffset) != 0) {
idle = false;
@@ -440,6 +440,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 1);
lastUpdatedOffset = _streamPartitionMsgOffsetFactory.create(_currentOffset);
+ } else if (endCriteriaReached) {
+ // At this point current offset has not moved because processStreamEvents() has exited before processing a
+ // single message
+ if (_segmentLogger.isDebugEnabled()) {
+ _segmentLogger.debug("No messages processed before end criteria was reached. Staying at offset {}",
+ _currentOffset);
+ }
+ // We check this flag again further down
} else if (messageBatch.getUnfilteredMessageCount() > 0) {
idle = false;
// we consumed something from the stream but filtered all the content out,
@@ -453,11 +461,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
} else {
// We did not consume any rows.
if (!idle) {
- idleStartTimeMillis = System.currentTimeMillis();
+ idleStartTimeMillis = now();
idle = true;
}
if (idleTimeoutMillis >= 0) {
- long totalIdleTimeMillis = System.currentTimeMillis() - idleStartTimeMillis;
+ long totalIdleTimeMillis = now() - idleStartTimeMillis;
if (totalIdleTimeMillis > idleTimeoutMillis) {
// Update the partition-consuming metric only if we have been idling beyond idle timeout.
// Create a new stream consumer wrapper, in case we are stuck on something.
@@ -469,6 +477,11 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
}
}
}
+
+ if (endCriteriaReached) {
+ // check this flag to avoid calling endCriteriaReached() at the beginning of the loop
+ break;
+ }
}
if (_numRowsErrored > 0) {
@@ -478,7 +491,13 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
return true;
}
- private void processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeSleepTimeMillis) {
+ /**
+ * @param messagesAndOffsets batch of messages to process
+ * @param idlePipeSleepTimeMillis wait time in case no messages were read
+ * @return returns <code>true</code> if the process loop ended before processing the batch, <code>false</code>
+ * otherwise
+ */
+ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeSleepTimeMillis) {
int messageCount = messagesAndOffsets.getMessageCount();
_rateLimiter.throttle(messageCount);
@@ -492,8 +511,10 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
GenericRow reuse = new GenericRow();
TransformPipeline.Result reusedResult = new TransformPipeline.Result();
+ boolean prematureExit = false;
for (int index = 0; index < messageCount; index++) {
- if (_shouldStop || endCriteriaReached()) {
+ prematureExit = _shouldStop || endCriteriaReached();
+ if (prematureExit) {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("stop processing message batch early shouldStop: {}", _shouldStop);
}
@@ -540,7 +561,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
String errorMessage = String.format("Caught exception while transforming the record: %s", decodedRow);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
- new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ new SegmentErrorInfo(now(), errorMessage, e));
}
if (reusedResult.getSkippedRowCount() > 0) {
realtimeRowsDroppedMeter =
@@ -559,7 +580,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
String errorMessage = String.format("Caught exception while indexing the record: %s", transformedRow);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
- new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ new SegmentErrorInfo(now(), errorMessage, e));
}
}
} else {
@@ -580,13 +601,14 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentLogger.debug("Indexed {} messages ({} messages read from stream) current offset {}",
indexedMessageCount, streamMessageCount, _currentOffset);
}
- } else {
+ } else if (!prematureExit) {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis);
}
// If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS);
}
+ return prematureExit;
}
public class PartitionConsumer implements Runnable {
@@ -705,7 +727,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
postStopConsumedMsg(e.getClass().getName());
_state = State.ERROR;
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
_serverMetrics.setValueOfTableGauge(_metricKeyName, ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
}
@@ -853,7 +875,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
_segmentLogger.error("Could not build segment", e);
FileUtils.deleteQuietly(tempSegmentFolder);
_realtimeTableDataManager.addSegmentError(_segmentNameStr,
- new SegmentErrorInfo(System.currentTimeMillis(), "Could not build segment", e));
+ new SegmentErrorInfo(now(), "Could not build segment", e));
return null;
}
final long buildTimeMillis = now() - lockAcquireTimeMillis;
@@ -875,7 +897,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
String.format("Caught exception while moving index directory from: %s to: %s", tempIndexDir, indexDir);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
return null;
} finally {
FileUtils.deleteQuietly(tempSegmentFolder);
@@ -896,7 +918,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
String.format("Caught exception while taring index directory from: %s to: %s", indexDir, segmentTarFile);
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
return null;
}
@@ -906,7 +928,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
V1Constants.MetadataKeys.METADATA_FILE_NAME, indexDir);
_segmentLogger.error(errorMessage);
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, null));
+ .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
return null;
}
File creationMetaFile = SegmentDirectoryPaths.findCreationMetaFile(indexDir);
@@ -915,7 +937,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
V1Constants.SEGMENT_CREATION_META, indexDir);
_segmentLogger.error(errorMessage);
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, null));
+ .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, null));
return null;
}
Map<String, File> metadataFiles = new HashMap<>();
@@ -932,7 +954,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
String errorMessage = "Interrupted while waiting for semaphore";
_segmentLogger.error(errorMessage, e);
_realtimeTableDataManager
- .addSegmentError(_segmentNameStr, new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e));
+ .addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(), errorMessage, e));
return null;
} finally {
if (_segBuildSemaphore != null) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 50cf2a842f..ac8964bb32 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -28,7 +28,10 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -38,6 +41,7 @@ import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -53,6 +57,7 @@ import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
import org.apache.pinot.spi.stream.PermanentConsumerException;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
@@ -73,16 +78,15 @@ public class LLRealtimeSegmentDataManagerTest {
private static final String SEGMENT_DIR = "/tmp/" + LLRealtimeSegmentDataManagerTest.class.getSimpleName();
private static final File SEGMENT_DIR_FILE = new File(SEGMENT_DIR);
private static final String TABLE_NAME = "Coffee";
- private static final int PARTITION_GROUP_ID = 13;
+ private static final int PARTITION_GROUP_ID = 0;
private static final int SEQUENCE_ID = 945;
private static final long SEG_TIME_MS = 98347869999L;
private static final LLCSegmentName SEGMENT_NAME =
new LLCSegmentName(TABLE_NAME, PARTITION_GROUP_ID, SEQUENCE_ID, SEG_TIME_MS);
private static final String SEGMENT_NAME_STR = SEGMENT_NAME.getSegmentName();
- private static final long START_OFFSET_VALUE = 19885L;
+ private static final long START_OFFSET_VALUE = 198L;
private static final LongMsgOffset START_OFFSET = new LongMsgOffset(START_OFFSET_VALUE);
- private static long _timeNow = System.currentTimeMillis();
private final Map<Integer, Semaphore> _partitionGroupIdToSemaphoreMap = new ConcurrentHashMap<>();
private static TableConfig createTableConfig()
@@ -114,15 +118,32 @@ public class LLRealtimeSegmentDataManagerTest {
private FakeLLRealtimeSegmentDataManager createFakeSegmentManager()
throws Exception {
+ return createFakeSegmentManager(false, new TimeSupplier(), null, null);
+ }
+
+ private FakeLLRealtimeSegmentDataManager createFakeSegmentManager(boolean noUpsert, TimeSupplier timeSupplier,
+ @Nullable String maxRows, @Nullable String maxDuration)
+ throws Exception {
SegmentZKMetadata segmentZKMetadata = createZkMetadata();
TableConfig tableConfig = createTableConfig();
+ if (noUpsert) {
+ tableConfig.setUpsertConfig(null);
+ }
+ if (maxRows != null) {
+ tableConfig.getIndexingConfig().getStreamConfigs()
+ .put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, maxRows);
+ }
+ if (maxDuration != null) {
+ tableConfig.getIndexingConfig().getStreamConfigs()
+ .put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_TIME, maxDuration);
+ }
RealtimeTableDataManager tableDataManager = createTableDataManager(tableConfig);
LLCSegmentName llcSegmentName = new LLCSegmentName(SEGMENT_NAME_STR);
_partitionGroupIdToSemaphoreMap.putIfAbsent(PARTITION_GROUP_ID, new Semaphore(1));
Schema schema = Fixtures.createSchema();
ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
return new FakeLLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, tableDataManager, SEGMENT_DIR, schema,
- llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics);
+ llcSegmentName, _partitionGroupIdToSemaphoreMap, serverMetrics, timeSupplier);
}
@BeforeClass
@@ -143,7 +164,7 @@ public class LLRealtimeSegmentDataManagerTest {
final String offset = "34";
FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
{
- // Controller sends catchup response with both offset as well as streamPartitionMsgOffset
+ // Controller sends catchup response with both offset and streamPartitionMsgOffset
String responseStr =
"{" + " \"streamPartitionMsgOffset\" : \"" + offset + "\"," + " \"offset\" : " + offset + ","
+ " \"buildTimeSec\" : -1," + " \"isSplitCommitType\" : false,"
@@ -513,11 +534,11 @@ public class LLRealtimeSegmentDataManagerTest {
segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
// We should still get false because there is no messages fetched
- _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1;
+ segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS + 1);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
// Once there are messages fetched, and the time exceeds the extended hour, we should get true
setHasMessagesFetched(segmentDataManager, true);
- _timeNow += TimeUnit.HOURS.toMillis(1);
+ segmentDataManager._timeSupplier.add(TimeUnit.HOURS.toMillis(1));
Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
Assert.assertEquals(segmentDataManager.getStopReason(), SegmentCompletionProtocol.REASON_TIME_LIMIT);
segmentDataManager.destroy();
@@ -537,7 +558,7 @@ public class LLRealtimeSegmentDataManagerTest {
// In catching up state, test reaching final offset ignoring time
{
FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
- _timeNow += Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS;
+ segmentDataManager._timeSupplier.add(Fixtures.MAX_TIME_FOR_SEGMENT_CLOSE_MS);
segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CATCHING_UP);
final long finalOffset = START_OFFSET_VALUE + 100;
segmentDataManager.setFinalOffset(finalOffset);
@@ -551,9 +572,9 @@ public class LLRealtimeSegmentDataManagerTest {
// Case 1. We have reached final offset.
{
FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
- _timeNow += 1;
+ segmentDataManager._timeSupplier.add(1);
segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
- segmentDataManager.setConsumeEndTime(_timeNow + 10);
+ segmentDataManager.setConsumeEndTime(segmentDataManager._timeSupplier.get() + 10);
final long finalOffset = START_OFFSET_VALUE + 100;
segmentDataManager.setFinalOffset(finalOffset);
segmentDataManager.setCurrentOffset(finalOffset - 1);
@@ -566,14 +587,14 @@ public class LLRealtimeSegmentDataManagerTest {
{
FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager();
segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.CONSUMING_TO_ONLINE);
- final long endTime = _timeNow + 10;
+ final long endTime = segmentDataManager._timeSupplier.get() + 10;
segmentDataManager.setConsumeEndTime(endTime);
final long finalOffset = START_OFFSET_VALUE + 100;
segmentDataManager.setFinalOffset(finalOffset);
segmentDataManager.setCurrentOffset(finalOffset - 1);
- _timeNow = endTime - 1;
+ segmentDataManager._timeSupplier.set(endTime - 1);
Assert.assertFalse(segmentDataManager.invokeEndCriteriaReached());
- _timeNow = endTime;
+ segmentDataManager._timeSupplier.set(endTime);
Assert.assertTrue(segmentDataManager.invokeEndCriteriaReached());
segmentDataManager.destroy();
}
@@ -735,12 +756,118 @@ public class LLRealtimeSegmentDataManagerTest {
Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
}
+ @Test
+ public void testShouldNotSkipUnfilteredMessagesIfNotIndexedAndTimeThresholdIsReached()
+ throws Exception {
+ final int segmentTimeThresholdMins = 10;
+ TimeSupplier timeSupplier = new TimeSupplier() {
+ @Override
+ public Long get() {
+ long now = System.currentTimeMillis();
+ // now() is called once in the run() method, once before each batch reading and once for every row indexation
+ if (_timeCheckCounter.incrementAndGet() <= FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 4) {
+ return now;
+ }
+ // Exceed segment time threshold
+ return now + TimeUnit.MINUTES.toMillis(segmentTimeThresholdMins + 1);
+ }
+ };
+ FakeLLRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager(true, timeSupplier,
+ String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS * 2), segmentTimeThresholdMins + "m");
+ segmentDataManager._stubConsumeLoop = false;
+ segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+ LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer();
+ final LongMsgOffset endOffset =
+ new LongMsgOffset(START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ segmentDataManager._consumeOffsets.add(endOffset);
+ final SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(
+ new SegmentCompletionProtocol.Response.Params().withStatus(
+ SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+ .withStreamPartitionMsgOffset(endOffset.toString()));
+ segmentDataManager._responses.add(response);
+
+ consumer.run();
+
+ try {
+ // millis() is called first in run before consumption, then once for each batch and once for each message in
+ // the batch, then once more when metrics are updated after each batch is processed and then 4 more times in
+ // run() after consume loop
+ Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 8);
+ Assert.assertEquals(((LongMsgOffset) segmentDataManager.getCurrentOffset()).getOffset(),
+ START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+ FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ Assert.assertEquals(segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs(),
+ FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ } finally {
+ segmentDataManager.destroy();
+ }
+ }
+
+ @Test
+ public void testShouldNotSkipUnfilteredMessagesIfNotIndexedAndRowCountThresholdIsReached()
+ throws Exception {
+ final int segmentTimeThresholdMins = 10;
+ TimeSupplier timeSupplier = new TimeSupplier();
+ FakeLLRealtimeSegmentDataManager segmentDataManager =
+ createFakeSegmentManager(true, timeSupplier, String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS),
+ segmentTimeThresholdMins + "m");
+ segmentDataManager._stubConsumeLoop = false;
+ segmentDataManager._state.set(segmentDataManager, LLRealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+ LLRealtimeSegmentDataManager.PartitionConsumer consumer = segmentDataManager.createPartitionConsumer();
+ final LongMsgOffset endOffset =
+ new LongMsgOffset(START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ segmentDataManager._consumeOffsets.add(endOffset);
+ final SegmentCompletionProtocol.Response response = new SegmentCompletionProtocol.Response(
+ new SegmentCompletionProtocol.Response.Params().withStatus(
+ SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+ .withStreamPartitionMsgOffset(endOffset.toString()));
+ segmentDataManager._responses.add(response);
+
+ consumer.run();
+
+ try {
+ // millis() is called first in run before consumption, then once for each batch and once for each message in
+ // the batch, then once for metrics updates and then 4 more times in run() after consume loop
+ Assert.assertEquals(timeSupplier._timeCheckCounter.get(), FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS + 6);
+ Assert.assertEquals(((LongMsgOffset) segmentDataManager.getCurrentOffset()).getOffset(),
+ START_OFFSET_VALUE + FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+ FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ Assert.assertEquals(segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs(),
+ FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ } finally {
+ segmentDataManager.destroy();
+ }
+ }
+
+ private static class TimeSupplier implements Supplier<Long> {
+ protected final AtomicInteger _timeCheckCounter = new AtomicInteger();
+ protected long _timeNow = System.currentTimeMillis();
+
+ @Override
+ public Long get() {
+ _timeCheckCounter.incrementAndGet();
+ return _timeNow;
+ }
+
+ public void set(long millis) {
+ _timeNow = millis;
+ }
+
+ public void add(long millis) {
+ _timeNow += millis;
+ }
+ }
+
public static class FakeLLRealtimeSegmentDataManager extends LLRealtimeSegmentDataManager {
public Field _state;
public Field _shouldStop;
public Field _stopReason;
- private final Field _streamMsgOffsetFactory;
+ private Field _streamMsgOffsetFactory;
public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>();
public boolean _commitSegmentCalled = false;
@@ -752,6 +879,8 @@ public class LLRealtimeSegmentDataManagerTest {
public boolean _throwExceptionFromConsume = false;
public boolean _postConsumeStoppedCalled = false;
public Map<Integer, Semaphore> _semaphoreMap;
+ public boolean _stubConsumeLoop = true;
+ private TimeSupplier _timeSupplier;
private static InstanceDataManagerConfig makeInstanceDataManagerConfig() {
InstanceDataManagerConfig dataManagerConfig = mock(InstanceDataManagerConfig.class);
@@ -766,7 +895,8 @@ public class LLRealtimeSegmentDataManagerTest {
public FakeLLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, Schema schema,
- LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, ServerMetrics serverMetrics)
+ LLCSegmentName llcSegmentName, Map<Integer, Semaphore> semaphoreMap, ServerMetrics serverMetrics,
+ TimeSupplier timeSupplier)
throws Exception {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName,
@@ -781,6 +911,7 @@ public class LLRealtimeSegmentDataManagerTest {
_streamMsgOffsetFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
_streamMsgOffsetFactory.setAccessible(true);
_streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
+ _timeSupplier = timeSupplier;
}
public String getStopReason() {
@@ -826,12 +957,15 @@ public class LLRealtimeSegmentDataManagerTest {
@Override
protected boolean consumeLoop()
throws Exception {
- if (_throwExceptionFromConsume) {
- throw new PermanentConsumerException(new Throwable("Offset out of range"));
+ if (_stubConsumeLoop) {
+ if (_throwExceptionFromConsume) {
+ throw new PermanentConsumerException(new Throwable("Offset out of range"));
+ }
+ setCurrentOffset(_consumeOffsets.remove().getOffset());
+ terminateLoopIfNecessary();
+ return true;
}
- setCurrentOffset(_consumeOffsets.remove().getOffset());
- terminateLoopIfNecessary();
- return true;
+ return super.consumeLoop();
}
@Override
@@ -854,12 +988,16 @@ public class LLRealtimeSegmentDataManagerTest {
@Override
protected long now() {
- return _timeNow;
+ // now() is called in the constructor before _timeSupplier is set
+ if (_timeSupplier == null) {
+ return System.currentTimeMillis();
+ }
+ return _timeSupplier.get();
}
@Override
protected void hold() {
- _timeNow += 5000L;
+ _timeSupplier.add(5000L);
}
@Override
@@ -899,7 +1037,7 @@ public class LLRealtimeSegmentDataManagerTest {
@Override
public void stop() {
- _timeNow += _stopWaitTimeMs;
+ _timeSupplier.add(_stopWaitTimeMs);
}
public void setCurrentOffset(long offset) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
index 7f2e603231..cbb77066f2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakePartitionLevelConsumer.java
@@ -53,9 +53,10 @@ public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
private List<Integer> _messageOffsets = new ArrayList<>();
private List<byte[]> _messageBytes = new ArrayList<>();
+ private final int _defaultBatchSize;
- FakePartitionLevelConsumer(int partition, StreamConfig streamConfig) {
-
+ FakePartitionLevelConsumer(int partition, StreamConfig streamConfig, int defaultBatchSize) {
+ _defaultBatchSize = defaultBatchSize;
// TODO: this logic can move to a FakeStreamProducer instead of being inside the Consumer
File tempDir = new File(FileUtils.getTempDirectory(), getClass().getSimpleName());
File outputDir = new File(tempDir, String.valueOf(partition));
@@ -114,6 +115,10 @@ public class FakePartitionLevelConsumer implements PartitionLevelConsumer {
}
int startOffsetInt = (int) ((LongMsgOffset) startOffset).getOffset();
int endOffsetInt = (int) ((LongMsgOffset) endOffset).getOffset();
+ if (endOffsetInt > _messageOffsets.size() && _defaultBatchSize > 0) {
+ // Hack to get multiple batches
+ endOffsetInt = startOffsetInt + _defaultBatchSize;
+ }
return new FakeStreamMessageBatch(_messageOffsets.subList(startOffsetInt, endOffsetInt),
_messageBytes.subList(startOffsetInt, endOffsetInt));
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index d84261cd44..75de0fbee9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -63,7 +63,8 @@ public class FakeStreamConfigUtils {
private static final String CONSUMER_FACTORY_CLASS = FakeStreamConsumerFactory.class.getName();
private static final String OFFSET_CRITERIA = "smallest";
private static final String DECODER_CLASS = FakeStreamMessageDecoder.class.getName();
- private static final int SEGMENT_FLUSH_THRESHOLD_ROWS = 500;
+ public static final int SEGMENT_FLUSH_THRESHOLD_ROWS = 500;
+ public static final int MESSAGE_BATCH_SIZE = SEGMENT_FLUSH_THRESHOLD_ROWS;
/**
* Gets default num partitions
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index e4c5301215..1923a4d822 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -47,7 +47,7 @@ public class FakeStreamConsumerFactory extends StreamConsumerFactory {
@Override
public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) {
- return new FakePartitionLevelConsumer(partition, _streamConfig);
+ return new FakePartitionLevelConsumer(partition, _streamConfig, FakeStreamConfigUtils.MESSAGE_BATCH_SIZE);
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
index 8348435f1a..134aac9fd8 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageBatch.java
@@ -36,26 +36,32 @@ public class FakeStreamMessageBatch implements MessageBatch<byte[]> {
_messageBytes = messageBytes;
}
+ @Override
public int getMessageCount() {
return _messageOffsets.size();
}
+ @Override
public byte[] getMessageAtIndex(int index) {
return _messageBytes.get(index);
}
+ @Override
public int getMessageOffsetAtIndex(int index) {
return _messageOffsets.get(index);
}
+ @Override
public int getMessageLengthAtIndex(int index) {
return _messageBytes.get(index).length;
}
+ @Override
public long getNextStreamMessageOffsetAtIndex(int index) {
throw new UnsupportedOperationException("This method is deprecated");
}
+ @Override
public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) {
return new LongMsgOffset(_messageOffsets.get(index) + 1);
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
index 10f358fb8a..07eacfbf3e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.realtime.impl.fakestream;
import java.util.Map;
import java.util.Set;
+import org.apache.pinot.segment.local.segment.creator.Fixtures;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
@@ -35,11 +36,13 @@ public class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
@Override
public GenericRow decode(byte[] payload, GenericRow destination) {
- return null;
+ GenericRow row = Fixtures.createSingleRow(System.currentTimeMillis());
+ destination.init(row);
+ return destination;
}
@Override
public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
- return null;
+ return decode(payload, destination);
}
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
index b53546c1b8..5e36a22d09 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/SimpleConsumerMessageBatch.java
@@ -35,18 +35,22 @@ public class SimpleConsumerMessageBatch implements MessageBatch<byte[]> {
}
}
+ @Override
public int getMessageCount() {
return _messageList.size();
}
+ @Override
public byte[] getMessageAtIndex(int index) {
return _messageList.get(index).message().payload().array();
}
+ @Override
public int getMessageOffsetAtIndex(int index) {
return _messageList.get(index).message().payload().arrayOffset();
}
+ @Override
public int getMessageLengthAtIndex(int index) {
return _messageList.get(index).message().payloadSize();
}
@@ -56,6 +60,7 @@ public class SimpleConsumerMessageBatch implements MessageBatch<byte[]> {
throw new UnsupportedOperationException("This method is deprecated");
}
+ @Override
public StreamPartitionMsgOffset getNextStreamPartitionMsgOffsetAtIndex(int index) {
return new LongMsgOffset(_messageList.get(index).nextOffset());
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java
index 934c097b1a..3b1f9ba22b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/Fixtures.java
@@ -109,7 +109,7 @@ public class Fixtures {
return Schema.fromString(SCHEMA_JSON);
}
- public static GenericRow createSingleRow(int randomSeed) {
+ public static GenericRow createSingleRow(long randomSeed) {
Random rand = new Random(randomSeed);
int colValue = rand.nextInt(Integer.MAX_VALUE);
GenericRow retVal = new GenericRow();
@@ -120,7 +120,7 @@ public class Fixtures {
return retVal;
}
- public static GenericRow createInvalidSingleRow(int randomSeed) {
+ public static GenericRow createInvalidSingleRow(long randomSeed) {
Random rand = new Random(randomSeed);
int colValue = rand.nextInt(Integer.MAX_VALUE);
GenericRow retVal = new GenericRow();
@@ -131,7 +131,7 @@ public class Fixtures {
return retVal;
}
- public static GenericRow createMultipleRow(int randomSeed) {
+ public static GenericRow createMultipleRow(long randomSeed) {
Random rand = new Random(randomSeed);
GenericRow firstRow = createSingleRow(randomSeed);
GenericRow secondRow = createSingleRow(rand.nextInt(Integer.MAX_VALUE));
@@ -141,7 +141,7 @@ public class Fixtures {
return retVal;
}
- public static GenericRow createMultipleRowPartialFailure(int randomSeed) {
+ public static GenericRow createMultipleRowPartialFailure(long randomSeed) {
Random rand = new Random(randomSeed);
GenericRow firstRow = createSingleRow(randomSeed);
GenericRow secondRow = createInvalidSingleRow(rand.nextInt(Integer.MAX_VALUE));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org