You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/09/01 19:56:51 UTC

[pinot] 03/04: Add capabilities to ingest from another stream without disabling the realtime table (#9289)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch release-0.11-rc
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit ce49c4f4cc293ae9629d36d59ef83cda2240c543
Author: Sajjad Moradi <mo...@gmail.com>
AuthorDate: Mon Aug 29 17:44:51 2022 -0700

    Add capabilities to ingest from another stream without disabling the realtime table (#9289)
---
 .../api/resources/PinotRealtimeTableResource.java  |  20 ++--
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 113 +++++++++++++--------
 .../RealtimeSegmentValidationManager.java          |  10 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   5 +-
 4 files changed, 98 insertions(+), 50 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 7bf4ac4235..715d65bede 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -31,6 +31,7 @@ import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -61,8 +62,7 @@ public class PinotRealtimeTableResource {
   @POST
   @Path("/tables/{tableName}/pauseConsumption")
   @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Pause consumption of a realtime table",
-      notes = "Pause the consumption of a realtime table")
+  @ApiOperation(value = "Pause consumption of a realtime table", notes = "Pause the consumption of a realtime table")
   public Response pauseConsumption(
       @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
     String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
@@ -77,14 +77,22 @@ public class PinotRealtimeTableResource {
   @POST
   @Path("/tables/{tableName}/resumeConsumption")
   @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Resume consumption of a realtime table",
-      notes = "Resume the consumption for a realtime table")
+  @ApiOperation(value = "Resume consumption of a realtime table", notes =
+      "Resume the consumption for a realtime table. ConsumeFrom parameter indicates from which offsets "
+          + "consumption should resume. If consumeFrom parameter is not provided, consumption continues based on the "
+          + "offsets in segment ZK metadata, and in case the offsets are already gone, the first available offsets are "
+          + "picked to minimize the data loss.")
   public Response resumeConsumption(
-      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) {
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "smallest | largest") @QueryParam("consumeFrom") String consumeFrom) {
     String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
+    if (consumeFrom != null && !consumeFrom.equalsIgnoreCase("smallest") && !consumeFrom.equalsIgnoreCase("largest")) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("consumeFrom param '%s' is not valid.", consumeFrom), Response.Status.BAD_REQUEST);
+    }
     try {
-      return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+      return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType, consumeFrom)).build();
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
     }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 5cec6b8c4b..762ffdc421 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -873,7 +873,7 @@ public class PinotLLCRealtimeSegmentManager {
    * which means it's manually triggered by admin not by automatic periodic task)
    */
   public void ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
-      boolean recreateDeletedConsumingSegment) {
+      boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -881,17 +881,20 @@ public class PinotLLCRealtimeSegmentManager {
       assert idealState != null;
       boolean isTableEnabled = idealState.isEnabled();
       boolean isTablePaused = isTablePaused(idealState);
+      boolean offsetsHaveToChange = offsetCriteria != null;
       if (isTableEnabled && !isTablePaused) {
         List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
-            getPartitionGroupConsumptionStatusList(idealState, streamConfig);
-        // Read the smallest offset when a new partition is detected
+            offsetsHaveToChange
+                ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions
+                : getPartitionGroupConsumptionStatusList(idealState, streamConfig);
         OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
-        streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+        // Read the smallest offset when a new partition is detected
+        streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
         List<PartitionGroupMetadata> newPartitionGroupMetadataList =
             getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
         streamConfig.setOffsetCriteria(originalOffsetCriteria);
         return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
-            recreateDeletedConsumingSegment);
+            recreateDeletedConsumingSegment, offsetCriteria);
       } else {
         LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
             realtimeTableName, isTableEnabled, isTablePaused);
@@ -1052,7 +1055,7 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
       IdealState idealState, List<PartitionGroupMetadata> newPartitionGroupMetadataList,
-      boolean recreateDeletedConsumingSegment) {
+      boolean recreateDeletedConsumingSegment, OffsetCriteria offsetCriteria) {
     String realtimeTableName = tableConfig.getTableName();
 
     InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig);
@@ -1074,6 +1077,16 @@ public class PinotLLCRealtimeSegmentManager {
     // Get the latest segment ZK metadata for each partition
     Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap = getLatestSegmentZKMetadataMap(realtimeTableName);
 
+    // create a map of <parition id, start offset> using data already fetched in newPartitionGroupMetadataList
+    Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new HashMap<>();
+    for (PartitionGroupMetadata metadata : newPartitionGroupMetadataList) {
+      partitionGroupIdToStartOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
+    }
+    Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset = null;
+    if (offsetCriteria == OffsetCriteria.SMALLEST_OFFSET_CRITERIA) {
+      partitionGroupIdToSmallestStreamOffset = partitionGroupIdToStartOffset;
+    }
+
     // Walk over all partitions that we have metadata for, and repair any partitions necessary.
     // Possible things to repair:
     // 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING:
@@ -1144,10 +1157,18 @@ public class PinotLLCRealtimeSegmentManager {
           // 3. we should never end up with some replicas ONLINE and some OFFLINE.
           if (isAllInstancesInState(instanceStateMap, SegmentStateModel.OFFLINE)) {
             LOGGER.info("Repairing segment: {} which is OFFLINE for all instances in IdealState", latestSegmentName);
-            StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+            if (partitionGroupIdToSmallestStreamOffset == null) {
+              // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset.
+              // This is to prevent fetching the same info for each and every partition group.
+              partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig);
+            }
+            StreamPartitionMsgOffset startOffset =
+                selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset,
+                    partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
+                    latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
-                partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap,
-                segmentAssignment, instancePartitionsMap, startOffset);
+                newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
+                instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
               if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted()
@@ -1155,10 +1176,18 @@ public class PinotLLCRealtimeSegmentManager {
                 // If we get here, that means in IdealState, the latest segment has all replicas ONLINE.
                 // Create a new IN_PROGRESS segment in PROPERTYSTORE,
                 // add it as CONSUMING segment to IDEALSTATE.
-                StreamPartitionMsgOffset startOffset = offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
+                if (partitionGroupIdToSmallestStreamOffset == null) {
+                  // Smallest offset is fetched from stream once and stored in partitionGroupIdToSmallestStreamOffset.
+                  // This is to prevent fetching the same info for each and every partition group.
+                  partitionGroupIdToSmallestStreamOffset = fetchPartitionGroupIdToSmallestOffset(streamConfig);
+                }
+                StreamPartitionMsgOffset startOffset =
+                    selectStartOffset(offsetCriteria, partitionGroupId, partitionGroupIdToStartOffset,
+                        partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory,
+                        latestSegmentZKMetadata.getEndOffset());
                 createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs,
-                    partitionGroupId, newPartitionGroupMetadataList, instancePartitions, instanceStatesMap,
-                    segmentAssignment, instancePartitionsMap, startOffset);
+                    newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment,
+                    instancePartitionsMap, startOffset);
               } else {
                 LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap,
                     latestSegmentName);
@@ -1220,7 +1249,7 @@ public class PinotLLCRealtimeSegmentManager {
   }
 
   private void createNewConsumingSegment(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
-      SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, int partitionGroupId,
+      SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
       List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions,
       Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) {
@@ -1228,23 +1257,6 @@ public class PinotLLCRealtimeSegmentManager {
     int numPartitions = newPartitionGroupMetadataList.size();
     LLCSegmentName latestLLCSegmentName = new LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
     LLCSegmentName newLLCSegmentName = getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
-    StreamPartitionMsgOffset partitionGroupSmallestOffset =
-        getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
-
-    if (partitionGroupSmallestOffset != null) {
-      // Start offset must be higher than the start offset of the stream
-      if (partitionGroupSmallestOffset.compareTo(startOffset) > 0) {
-        LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffset,
-            partitionGroupSmallestOffset, partitionGroupId, tableConfig.getTableName());
-        _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-        startOffset = partitionGroupSmallestOffset;
-      }
-    } else {
-      LOGGER.error("Smallest offset for partition: {} of table: {} not found. Using startOffset: {}", partitionGroupId,
-          tableConfig.getTableName(), startOffset);
-      _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
-    }
-
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), startOffset.toString(), 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor,
@@ -1254,21 +1266,39 @@ public class PinotLLCRealtimeSegmentManager {
         instancePartitionsMap);
   }
 
-  @Nullable
-  private StreamPartitionMsgOffset getPartitionGroupSmallestOffset(StreamConfig streamConfig, int partitionGroupId) {
+  private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
     OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
     streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
-    List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
+    List<PartitionGroupMetadata> partitionGroupMetadataList =
         getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList());
     streamConfig.setOffsetCriteria(originalOffsetCriteria);
-    StreamPartitionMsgOffset partitionStartOffset = null;
-    for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) {
-      if (info.getPartitionGroupId() == partitionGroupId) {
-        partitionStartOffset = info.getStartOffset();
-        break;
+    Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset = new HashMap<>();
+    for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
+      partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), metadata.getStartOffset());
+    }
+    return partitionGroupIdToSmallestOffset;
+  }
+
+  private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria offsetCriteria, int partitionGroupId,
+      Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset,
+      Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestStreamOffset, String tableName,
+      StreamPartitionMsgOffsetFactory offsetFactory, String startOffsetInSegmentZkMetadataStr) {
+    if (offsetCriteria != null) {
+      // use the fetched offset according to offset criteria
+      return partitionGroupIdToStartOffset.get(partitionGroupId);
+    } else {
+      // use offset from segment ZK metadata
+      StreamPartitionMsgOffset startOffsetInSegmentZkMetadata = offsetFactory.create(startOffsetInSegmentZkMetadataStr);
+      StreamPartitionMsgOffset streamSmallestOffset = partitionGroupIdToSmallestStreamOffset.get(partitionGroupId);
+      // Start offset in ZK must be higher than the start offset of the stream
+      if (streamSmallestOffset.compareTo(startOffsetInSegmentZkMetadata) > 0) {
+        LOGGER.error("Data lost from offset: {} to: {} for partition: {} of table: {}", startOffsetInSegmentZkMetadata,
+            streamSmallestOffset, partitionGroupId, tableName);
+        _controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
+        return streamSmallestOffset;
       }
+      return startOffsetInSegmentZkMetadata;
     }
-    return partitionStartOffset;
   }
 
   private LLCSegmentName getNextLLCSegmentName(LLCSegmentName lastLLCSegmentName, long creationTimeMs) {
@@ -1448,12 +1478,15 @@ public class PinotLLCRealtimeSegmentManager {
    *   1) setting "isTablePaused" in ideal states to false and
    *   2) triggering segment validation job to create new consuming segments in ideal states
    */
-  public PauseStatus resumeConsumption(String tableNameWithType) {
+  public PauseStatus resumeConsumption(String tableNameWithType, @Nullable String offsetCriteria) {
     IdealState updatedIdealState = updatePauseStatusInIdealState(tableNameWithType, false);
 
     // trigger realtime segment validation job to resume consumption
     Map<String, String> taskProperties = new HashMap<>();
     taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true");
+    if (offsetCriteria != null) {
+      taskProperties.put(RealtimeSegmentValidationManager.OFFSET_CRITERIA, offsetCriteria);
+    }
     _helixResourceManager
         .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index b772fcabd1..90c23361d1 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -34,6 +34,7 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -55,6 +56,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   private long _lastSegmentLevelValidationRunTimeMs = 0L;
 
   public static final String RECREATE_DELETED_CONSUMING_SEGMENT_KEY = "recreateDeletedConsumingSegment";
+  public static final String OFFSET_CRITERIA = "offsetCriteria";
 
   public RealtimeSegmentValidationManager(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -82,7 +84,10 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
     }
     context._recreateDeletedConsumingSegment =
         Boolean.parseBoolean(periodicTaskProperties.getProperty(RECREATE_DELETED_CONSUMING_SEGMENT_KEY));
-
+    String offsetCriteriaStr = periodicTaskProperties.getProperty(OFFSET_CRITERIA);
+    if (offsetCriteriaStr != null) {
+      context._offsetCriteria = new OffsetCriteria.OffsetCriteriaBuilder().withOffsetString(offsetCriteriaStr);
+    }
     return context;
   }
 
@@ -106,7 +111,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
 
     if (streamConfig.hasLowLevelConsumerType()) {
       _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, streamConfig,
-          context._recreateDeletedConsumingSegment);
+          context._recreateDeletedConsumingSegment, context._offsetCriteria);
     }
   }
 
@@ -178,6 +183,7 @@ public class RealtimeSegmentValidationManager extends ControllerPeriodicTask<Rea
   public static final class Context {
     private boolean _runSegmentLevelValidation;
     private boolean _recreateDeletedConsumingSegment;
+    private OffsetCriteria _offsetCriteria;
   }
 
   @VisibleForTesting
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 14ee8970ec..eee1f2a8a8 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -888,7 +888,8 @@ public class PinotLLCRealtimeSegmentManagerTest {
       // Expected
     }
     try {
-      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false);
+      segmentManager.ensureAllPartitionsConsuming(segmentManager._tableConfig, segmentManager._streamConfig, false,
+          null);
       fail();
     } catch (IllegalStateException e) {
       // Expected
@@ -1115,7 +1116,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     public void ensureAllPartitionsConsuming() {
       ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false);
+          getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList()), false, null);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org