You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/09/01 19:56:51 UTC
[pinot] 03/04: Add capabilities to ingest from another stream without disabling the realtime table (#9289)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch release-0.11-rc
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit ce49c4f4cc293ae9629d36d59ef83cda2240c543
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Aug 29 17:44:51 2022 -0700
Add capabilities to ingest from another stream without disabling the realtime table (#9289)
---
.../api/resources/PinotRealtimeTableResource.java | 20 ++--
.../realtime/PinotLLCRealtimeSegmentManager.java | 113 +++++++++++++--------
.../RealtimeSegmentValidationManager.java | 10 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 5 +-
4 files changed, 98 insertions(+), 50 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 7bf4ac4235..715d65bede 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -31,6 +31,7 @@ import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -61,8 +62,7 @@ public class PinotRealtimeTableResource {
@POST
@Path("/tables/{tableName}/pauseConsumption")
@Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Pause consumption of a realtime table",
- notes = "Pause the consumption of a realtime table")
+ @ApiOperation(value = "Pause consumption of a realtime table", notes = "Pause the consumption of a realtime table")
public Response pauseConsumption(
@ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
@@ -77,14 +77,22 @@ public class PinotRealtimeTableResource {
@POST
@Path("/tables/{tableName}/resumeConsumption")
@Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Resume consumption of a realtime table",
- notes = "Resume the consumption for a realtime table")
+ @ApiOperation(value = "Resume consumption of a realtime table", notes =
+ "Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets "
+ + "consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the "
+ + "offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are "
+ + "picked to minimize the data loss.")
public Response resumeConsumption(
- @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+ @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+ @ApiParam(value = "smallest | largest") @QueryParam("consumeFrom") String consumeFrom) {
String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
validate(tableNameWithType);
+ if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && !consumeFrom.equalsIgnoreCase("largest")) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("consumeFrom param '%s' is not valid.", consumeFrom), Response.Status.BAD_REQUEST);
+ }
try {
- return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+ return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, consumeFrom)).build();
} catch (Exception e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5cec6b8c4b..762ffdc421 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -873,7 +873,7 @@ public class PinotLLCRealtimeSegmentManager {
* which means it's manually triggered by admin not by automatic periodic task)
*/
public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
- boolean recreateDeletedConsumingSegment) {
+ boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
@@ -881,17 +881,20 @@ public class PinotLLCRealtimeSegmentManager {
assert idealState != null;
boolean isTableEnabled = idealState.isEnabled();
boolean isTablePaused = isTablePaused(idealState);
+ boolean offsetsHaveToChange = offsetCriteria != null;
if (isTableEnabled && !isTablePaused) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
- getPartitionGroupConsumptionStatusList(idealState, streamConfig);
- // Read the smallest offset when a new partition is detected
+ offsetsHaveToChange
+ ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions
+ : getPartitionGroupConsumptionStatusList(idealState, streamConfig);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
- streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+ // Read the smallest offset when a new partition is detected
+ streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
- recreateDeletedConsumingSegment);
+ recreateDeletedConsumingSegment, offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
realtimeTableName, isTableEnabled, isTablePaused);
@@ -1052,7 +1055,7 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList,
- boolean recreateDeletedConsumingSegment) {
+ boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
String realtimeTableName = tableConfig.getTableName();
InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
@@ -1074,6 +1077,16 @@ public class PinotLLCRealtimeSegmentManager {
// Get the latest segment ZK metadata for each partition
Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName);
+ // create a map of <parition id, start offset> using data already fetched in newPartitionGroupMetadataList
+ Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new HashMap<>();
+ for (PartitionGroupMetadata metadata : newPartitionGroupMetadataList) {
+ partitionGroupIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
+ }
+ Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset = null;
+ if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) {
+ partitionGroupIdToSmallestStreamOffset = partitionGroupIdToStartOffset;
+ }
+
// Walk over all partitions that we have metadata for, and repair any partitions necessary.
// Possible things to repair:
// 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING:
@@ -1144,10 +1157,18 @@ public class PinotLLCRealtimeSegmentManager {
// 3. we should never end up with some replicas ONLINE and some OFFLINE.
if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) {
LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName);
- StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+ if (partitionGroupIdToSmallestStreamOffset == null) {
+ // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset.
+ // This is to prevent fetching the same info for each and every partition group.
+ partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig);
+ }
+ StreamPartitionMsgOffset startOffset =
+ selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset,
+ partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
+ latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
- partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap,
- segmentAssignment, instancePartitionsMap, startOffset);
+ newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
+ instancePartitionsMap, startOffset);
} else {
if (newPartitionGroupSet.contains(partitionGroupId)) {
if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted()
@@ -1155,10 +1176,18 @@ public class PinotLLCRealtimeSegmentManager {
// If we get here, that means in IdealState, the latest segment has all replicas ONLINE.
// Create a new IN_PROGRESS segment in PROPERTYSTORE,
// add it as CONSUMING segment to IDEALSTATE.
- StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
+ if (partitionGroupIdToSmallestStreamOffset == null) {
+ // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset.
+ // This is to prevent fetching the same info for each and every partition group.
+ partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig);
+ }
+ StreamPartitionMsgOffset startOffset =
+ selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset,
+ partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
+ latestSegmentZKMetadata.getEndOffset());
createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
- partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap,
- segmentAssignment, instancePartitionsMap, startOffset);
+ newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
+ instancePartitionsMap, startOffset);
} else {
LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
latestSegmentName);
@@ -1220,7 +1249,7 @@ public class PinotLLCRealtimeSegmentManager {
}
private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
- SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int partitionGroupId,
+ SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions,
Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) {
@@ -1228,23 +1257,6 @@ public class PinotLLCRealtimeSegmentManager {
int numPartitions = newPartitionGroupMetadataList.size();
LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
- StreamPartitionMsgOffset partitionGroupSmallestOffset =
- getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
-
- if (partitionGroupSmallestOffset != null) {
- // Start offset must be higher than the start offset of the stream
- if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
- LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset,
- partitionGroupSmallestOffset, partitionGroupId, tableConfig.getTableName());
- _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
- startOffset = partitionGroupSmallestOffset;
- }
- } else {
- LOGGER.error("Smallest offset for partition: {} of table: {} not found. Using startOffset: {}", partitionGroupId,
- tableConfig.getTableName(), startOffset);
- _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
- }
-
CommittingSegmentDescriptor committingSegmentDescriptor =
new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
@@ -1254,21 +1266,39 @@ public class PinotLLCRealtimeSegmentManager {
instancePartitionsMap);
}
- @Nullable
- private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) {
+ private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
- List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
streamConfig.setOffsetCriteria(originalOffsetCriteria);
- StreamPartitionMsgOffset partitionStartOffset = null;
- for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) {
- if (info.getPartitionGroupId() == partitionGroupId) {
- partitionStartOffset = info.getStartOffset();
- break;
+ Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>();
+ for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
+ partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
+ }
+ return partitionGroupIdToSmallestOffset;
+ }
+
+ private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId,
+ Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset,
+ Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset, String tableName,
+ StreamPartitionMsgOffsetFactory offsetFactory, String startOffsetInSegmentZkMetadataStr) {
+ if (offsetCriteria != null) {
+ // use the fetched offset according to offset criteria
+ return partitionGroupIdToStartOffset.get(partitionGroupId);
+ } else {
+ // use offset from segment ZK metadata
+ StreamPartitionMsgOffset startOffsetInSegmentZkMetadata = offsetFactory.create(startOffsetInSegmentZkMetadataStr);
+ StreamPartitionMsgOffset streamSmallestOffset = partitionGroupIdToSmallestStreamOffset.get(partitionGroupId);
+ // Start offset in ZK must be higher than the start offset of the stream
+ if (streamSmallestOffset.compareTo(startOffsetInSegmentZkMetadata) > 0) {
+ LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata,
+ streamSmallestOffset, partitionGroupId, tableName);
+ _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
+ return streamSmallestOffset;
}
+ return startOffsetInSegmentZkMetadata;
}
- return partitionStartOffset;
}
private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
@@ -1448,12 +1478,15 @@ public class PinotLLCRealtimeSegmentManager {
* 1) setting "isTablePaused" in ideal states to false and
* 2) triggering segment validation job to create new consuming segments in ideal states
*/
- public PauseStatus resumeConsumption(String tableNameWithType) {
+ public PauseStatus resumeConsumption(String tableNameWithType, @Nullable String offsetCriteria) {
IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false);
// trigger realtime segment validation job to resume consumption
Map<String, String> taskProperties = new HashMap<>();
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
+ if (offsetCriteria != null) {
+ taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, offsetCriteria);
+ }
_helixResourceManager
.invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index b772fcabd1..90c23361d1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -34,6 +34,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -55,6 +56,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
private long _lastSegmentLevelValidationRunTimeMs = 0L;
public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = "recreateDeletedConsumingSegment";
+ public static final String OFFSET_CRITERIA = "offsetCriteria";
public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -82,7 +84,10 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
}
context._recreateDeletedConsumingSegment =
Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));
-
+ String offsetCriteriaStr = periodicTaskProperties.getProperty(OFFSET_CRITERIA);
+ if (offsetCriteriaStr != null) {
+ context._offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
+ }
return context;
}
@@ -106,7 +111,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
if (streamConfig.hasLowLevelConsumerType()) {
_llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig,
- context._recreateDeletedConsumingSegment);
+ context._recreateDeletedConsumingSegment, context._offsetCriteria);
}
}
@@ -178,6 +183,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
public static final class Context {
private boolean _runSegmentLevelValidation;
private boolean _recreateDeletedConsumingSegment;
+ private OffsetCriteria _offsetCriteria;
}
@VisibleForTesting
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 14ee8970ec..eee1f2a8a8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -888,7 +888,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Expected
}
try {
- segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false);
+ segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false,
+ null);
fail();
} catch (IllegalStateException e) {
// Expected
@@ -1115,7 +1116,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
- getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false);
+ getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false, null);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org