You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/04/22 19:40:04 UTC

[GitHub] [pinot] sajjad-moradi opened a new pull request, #8584: Enable uploading segments for realtime tables

sajjad-moradi opened a new pull request, #8584:
URL: https://github.com/apache/pinot/pull/8584

   # Description
   Currently upload endpoint on controller accepts segments for offline table and also upsert-enabled realtime tables. This PR extends it to all realtime tables.
   Corresponding issue: #8283 
   
   # Testing Done
   * Extended LLC realtime integration tests to upload segments to verify both refresh and new segment upload paths
   * Extended current unit tests for both segment assignment and rebalance
   
   # Release Notes
   Uploading segments are now enabled for all realtime table. Previously one could only upload segments to upsert-enabled realtime tables. This feature unblocks the followings for realtime tables:
   - Bootstrap a realtime table
   - Backfill
   - Segment merge & rollup
   - Segment purge & conversion
   
   With segment upload support, we can remove most of the restrictions to realtime tables, and make long retention realtime table much more manageable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r864295648


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -104,18 +104,15 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
   @Override
   public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
-    Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s",
-        _realtimeTableName);
+    Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
+    InstancePartitions instancePartitions = instancePartitionsMap.entrySet().iterator().next().getValue();

Review Comment:
   We should check the `InstancePartitionsType` of the entry to decide how to assign the segment. The current logic is for `CONSUMING` type only, and we should add one for `COMPLETED` type, which should be similar to `OfflineSegmentAssignment.assignSegment()`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1995,23 +1992,23 @@ public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithTy
   public void assignTableSegment(String tableNameWithType, String segmentName) {
     String segmentZKMetadataPath =
         ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName);
-    InstancePartitionsType instancePartitionsType;
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
-      // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
-      // TODO When upload segments is open to all realtime tables, we should change the type to COMPLETED instead.
-      // In addition, RealtimeSegmentAssignment.assignSegment(..) method should be updated so that the method does not
-      // assign segments to CONSUMING instance partition only.
-      instancePartitionsType = InstancePartitionsType.CONSUMING;
-    } else {
-      instancePartitionsType = InstancePartitionsType.OFFLINE;
-    }
 
     // Assign instances for the segment and add it into IdealState
     try {
       TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType);
-      SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
+      InstancePartitionsType instancePartitionsType;
+      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {

Review Comment:
   We should use `COMPLETED` type only if the `COMPLETED` instance partitions exists or the tag override is configured. If segment relocation is not configured, we should follow the same assignment as the `CONSUMING` segments.
   Let's keep the comments for the upsert case. For upsert segments, we should always use the `CONSUMING` type.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -359,6 +359,16 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
     _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
   }
 
+  /*
+   * This method is implemented to allow refreshing the segments in realtime tables.
+   */
+  @Override
+  public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)

Review Comment:
   Move this implementation to the `BaseTableDataManager`. It is common for both offline and realtime table



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,32 +35,27 @@ private SegmentUtils() {
   // Returns the partition id of a realtime segment based segment name and segment metadata info retrieved via Helix.
   // Important: The method is costly because it may read data from zookeeper. Do not use it in any query execution
   // path.
-  public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
-      HelixManager helixManager, String partitionColumn) {
+  public static @Nullable
+  Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager,

Review Comment:
   (convention)
   ```suggestion
     @Nullable
     public static Integer getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName, HelixManager helixManager,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r856590568


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java:
##########
@@ -92,12 +91,6 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
           Response.Status.CONFLICT);
     }
 
-    // TODO Allow segment refreshing for realtime tables.
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      throw new ControllerApplicationException(LOGGER,
-          "Refresh existing segment " + segmentName + " for realtime table " + tableNameWithType
-              + " is not yet supported ", Response.Status.NOT_IMPLEMENTED);
-    }

Review Comment:
   (minor) Remove one extra empty line



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,20 +37,23 @@ private SegmentUtils() {
   // Returns the partition id of a realtime segment based segment name and segment metadata info retrieved via Helix.
   // Important: The method is costly because it may read data from zookeeper. Do not use it in any query execution
   // path.
-  public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
+  public static Optional<Integer> getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
       HelixManager helixManager, String partitionColumn) {
     // A fast path if the segmentName is a LLC segment name and we can get the partition id from the name directly.
     if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-      return new LLCSegmentName(segmentName).getPartitionGroupId();
+      return Optional.of(new LLCSegmentName(segmentName).getPartitionGroupId());
     }
-    // Otherwise, retrieve the partition id from the segment zk metadata. Currently only realtime segments from upsert
-    // enabled tables have partition ids in their segment metadata.
+    // Otherwise, retrieve the partition id from the segment zk metadata.
     SegmentZKMetadata segmentZKMetadata =
         ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), realtimeTableName, segmentName);
     Preconditions
         .checkState(segmentZKMetadata != null, "Failed to find segment ZK metadata for segment: %s of table: %s",
             segmentName, realtimeTableName);
     SegmentPartitionMetadata segmentPartitionMetadata = segmentZKMetadata.getPartitionMetadata();
+    if (segmentPartitionMetadata == null

Review Comment:
   To be more robust, let's remove the checks and return the partition id when it is available, return `null` otherwise. The caller can then decide how to proceed
   ```suggestion
       if (segmentPartitionMetadata != null) {
         ColumnPartitionMetadata columnPartitionMetadata =
           segmentPartitionMetadata.getColumnPartitionMap().get(partitionColumn);
         if (columnPartitionMetadata != null && columnPartitionMetadata.getPartitions().size == 1) {
           return columnPartitionMetadata.getPartitions().iterator().next();
         }
       }
       return null;
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,20 +37,23 @@ private SegmentUtils() {
   // Returns the partition id of a realtime segment based segment name and segment metadata info retrieved via Helix.
   // Important: The method is costly because it may read data from zookeeper. Do not use it in any query execution
   // path.
-  public static int getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,
+  public static Optional<Integer> getRealtimeSegmentPartitionId(String segmentName, String realtimeTableName,

Review Comment:
   Suggest returning `Integer` (annotate with `@Nullable`) instead of `Optional`. Currently we usually use `null` to represent value not available, and several methods for `Optional` is not supported in JDK 8.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -260,13 +277,26 @@ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, S
           "No COMPLETED instance partitions found, reassigning COMPLETED segments the same way as CONSUMING segments "
               + "with CONSUMING instance partitions for table: {}", _realtimeTableName);
 
+      Set<String> uploadedSegmentsWithNoPartitionMetadata = new TreeSet<>();
       newAssignment = new TreeMap<>();
       for (String segmentName : completedSegmentAssignment.keySet()) {
+        if (!SegmentUtils
+            .getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn)
+            .isPresent()) {
+          uploadedSegmentsWithNoPartitionMetadata.add(segmentName);
+          continue;
+        }
         List<String> instancesAssigned = assignConsumingSegment(segmentName, consumingInstancePartitions);
         Map<String, String> instanceStateMap =
             SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE);
         newAssignment.put(segmentName, instanceStateMap);
       }
+      for (String segmentName : uploadedSegmentsWithNoPartitionMetadata) {

Review Comment:
   We should not assign one by one here. We may create a new map `unpartitionedCompletedSegmentAssignment` with only the unpartitioned segments, then use `reassignSegments()` to get minimum data movement



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -260,13 +277,26 @@ public Map<String, Map<String, String>> rebalanceTable(Map<String, Map<String, S
           "No COMPLETED instance partitions found, reassigning COMPLETED segments the same way as CONSUMING segments "
               + "with CONSUMING instance partitions for table: {}", _realtimeTableName);
 
+      Set<String> uploadedSegmentsWithNoPartitionMetadata = new TreeSet<>();

Review Comment:
   (minor) Rename to `unpartitionedSegments`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2011,7 +2003,8 @@ public void assignTableSegment(String tableNameWithType, String segmentName) {
     try {
       TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType);
-      SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
+      SegmentAssignment segmentAssignment =
+          SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, isUpsertTable(tableNameWithType));

Review Comment:
   Don't pass upsert info to the segment assignment engine, set `instancePartitionsType` to `CONSUMING` instead (same as the current code).
   Also, don't call `isUpsertTable()` because it will fetch another table config, use `tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE` instead.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -365,12 +405,32 @@ private List<String> assignCompletedSegment(String segmentName, Map<String, Map<
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
+      int desiredPartitionId = getDesiredPartitionId(segmentName, instancePartitions, currentAssignment);
       return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionGroupId);
+          .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, desiredPartitionId);
     }
   }
+
+  private int getDesiredPartitionId(String segmentName, InstancePartitions instancePartitions,
+      Map<String, Map<String, String>> currentAssignment) {
+    Optional<Integer> segmentPartitionIdOpt =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
+    int numPartitions = instancePartitions.getNumPartitions();
+    return segmentPartitionIdOpt.map(segPartitionId -> segPartitionId % numPartitions).orElseGet(() -> {
+      // find the partition with the lease number of segments
+      int minPartitionId = 0;
+      int minNumSegmentsAssigned = Integer.MAX_VALUE;
+      for (int partitionId = 0; partitionId < numPartitions; partitionId++) {
+        List<String> instances = instancePartitions.getInstances(partitionId, 0);
+        int[] numSegmentsAssignedPerInstance =
+            SegmentAssignmentUtils.getNumSegmentsAssignedPerInstance(currentAssignment, instances);

Review Comment:
   This method is quite expensive especially for large table. Calling it for each segment * numPartitions can cause performance issue. Instead we can use hash-code as suggested above



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -345,6 +380,11 @@ private Map<String, Map<String, String>> reassignSegments(String instancePartiti
 
         newAssignment = SegmentAssignmentUtils
             .rebalanceReplicaGroupBasedTable(currentAssignment, instancePartitions, partitionGroupIdToSegmentsMap);
+        for (String segName : uploadedSegmentsWithNoPartitionMetadata) {

Review Comment:
   We should combine the unpartitioned segments into the `partitionGroupIdToSegmentsMap` (assign a partition id to them). I'd suggest simply using the `Math.abs(segmentName.hashCode() % numPartitions)` as the partition group id because it is deterministic. We need to keep it deterministic so that during multiple rebalances, it won't switch partition which will force it to move around different servers



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -101,21 +110,26 @@ public void init(HelixManager helixManager, TableConfig tableConfig) {
         _replication, _partitionColumn, _realtimeTableName);
   }
 
+  /**
+   * Since the segment can be either a completed or a consuming segment, the type of the segment is inferred by the
+   * provided instancePartitionsMap, which has to contain only one InstancePartitionsType.
+   */
   @Override
   public List<String> assignSegment(String segmentName, Map<String, Map<String, String>> currentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
-    InstancePartitions instancePartitions = instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
-    Preconditions.checkState(instancePartitions != null, "Failed to find CONSUMING instance partitions for table: %s",
-        _realtimeTableName);
+    Preconditions
+        .checkState(instancePartitionsMap.keySet().size() == 1, "One instance partition type should be provided");
+    InstancePartitionsType instancePartitionsType = instancePartitionsMap.keySet().stream().findFirst().get();
+    InstancePartitions instancePartitions = instancePartitionsMap.get(instancePartitionsType);

Review Comment:
   ```suggestion
       Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance partition type should be provided");
       Map.Entry<InstancePartitionsType, InstancePartitions> entry = instancePartitionsMap.entrySet().iterator().next();
       InstancePartitionsType instancePartitionsType = entry.getKey();
       InstancePartitions instancePartitions = entry.getValue();
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -365,12 +405,32 @@ private List<String> assignCompletedSegment(String segmentName, Map<String, Map<
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
+      int desiredPartitionId = getDesiredPartitionId(segmentName, instancePartitions, currentAssignment);
       return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionGroupId);
+          .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, desiredPartitionId);
     }
   }
+
+  private int getDesiredPartitionId(String segmentName, InstancePartitions instancePartitions,
+      Map<String, Map<String, String>> currentAssignment) {
+    Optional<Integer> segmentPartitionIdOpt =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
+    int numPartitions = instancePartitions.getNumPartitions();
+    return segmentPartitionIdOpt.map(segPartitionId -> segPartitionId % numPartitions).orElseGet(() -> {

Review Comment:
   Suggest not using functional apis. Even though this method is not very performance critical, it is still called very frequently when rebalancing the table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r862282194


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -368,12 +364,19 @@ private List<String> assignCompletedSegment(String segmentName, Map<String, Map<
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
-      return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionGroupId);
+      int partitionId = getPartitionGroupId(segmentName) % instancePartitions.getNumPartitions();
+      return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
+    }
+  }
+
+  private int getPartitionGroupId(String segmentName) {
+    Integer segmentPartitionId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
+    if (segmentPartitionId == null) {
+      // This case is for the uploaded segments for which there's no partition information
+      // Choose a random, but consistent, partition id
+      segmentPartitionId = Math.abs(segmentName.hashCode());

Review Comment:
   Good catch. I'll update with an upper bound.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r872666499


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1995,23 +1992,23 @@ public SegmentZKMetadata constructZkMetadataForNewSegment(String tableNameWithTy
   public void assignTableSegment(String tableNameWithType, String segmentName) {
     String segmentZKMetadataPath =
         ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName);
-    InstancePartitionsType instancePartitionsType;
-    if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
-      // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
-      // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
-      // TODO When upload segments is open to all realtime tables, we should change the type to COMPLETED instead.
-      // In addition, RealtimeSegmentAssignment.assignSegment(..) method should be updated so that the method does not
-      // assign segments to CONSUMING instance partition only.
-      instancePartitionsType = InstancePartitionsType.CONSUMING;
-    } else {
-      instancePartitionsType = InstancePartitionsType.OFFLINE;
-    }
 
     // Assign instances for the segment and add it into IdealState
     try {
       TableConfig tableConfig = getTableConfig(tableNameWithType);
       Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType);
-      SegmentAssignment segmentAssignment = SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
+      InstancePartitionsType instancePartitionsType;
+      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {

Review Comment:
   Makes sense. Refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8584:
URL: https://github.com/apache/pinot/pull/8584#issuecomment-1106917323

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8584?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8584](https://codecov.io/gh/apache/pinot/pull/8584?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8ad458e) into [master](https://codecov.io/gh/apache/pinot/commit/f200ec515630442f95f6fc55f568b00f7cb04f4f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f200ec5) will **decrease** coverage by `6.77%`.
   > The diff coverage is `69.13%`.
   
   > :exclamation: Current head 8ad458e differs from pull request most recent head 8c0cf76. Consider uploading reports for the commit 8c0cf76 to get more accurate results
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #8584      +/-   ##
   ============================================
   - Coverage     70.75%   63.98%   -6.78%     
   - Complexity     4315     4317       +2     
   ============================================
     Files          1691     1645      -46     
     Lines         88606    86730    -1876     
     Branches      13437    13236     -201     
   ============================================
   - Hits          62690    55491    -7199     
   - Misses        21534    27223    +5689     
   + Partials       4382     4016     -366     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `66.97% <0.00%> (+0.02%)` | :arrow_up: |
   | unittests2 | `14.14% <69.13%> (+0.03%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8584?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pinot/common/utils/SegmentUtils.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vdXRpbHMvU2VnbWVudFV0aWxzLmphdmE=) | `0.00% <0.00%> (-71.43%)` | :arrow_down: |
   | [...ces/PinotSegmentUploadDownloadRestletResource.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90U2VnbWVudFVwbG9hZERvd25sb2FkUmVzdGxldFJlc291cmNlLmphdmE=) | `41.84% <ø> (-16.26%)` | :arrow_down: |
   | [...apache/pinot/controller/api/upload/ZKOperator.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvdXBsb2FkL1pLT3BlcmF0b3IuamF2YQ==) | `65.60% <ø> (-10.78%)` | :arrow_down: |
   | [...inot/controller/helix/ControllerRequestClient.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9Db250cm9sbGVyUmVxdWVzdENsaWVudC5qYXZh) | `51.00% <0.00%> (-12.00%)` | :arrow_down: |
   | [...ata/manager/realtime/RealtimeTableDataManager.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvUmVhbHRpbWVUYWJsZURhdGFNYW5hZ2VyLmphdmE=) | `11.55% <0.00%> (-56.72%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/StringUtil.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvU3RyaW5nVXRpbC5qYXZh) | `100.00% <ø> (ø)` | |
   | [...spi/utils/builder/ControllerRequestURLBuilder.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvYnVpbGRlci9Db250cm9sbGVyUmVxdWVzdFVSTEJ1aWxkZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../assignment/segment/RealtimeSegmentAssignment.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL2Fzc2lnbm1lbnQvc2VnbWVudC9SZWFsdGltZVNlZ21lbnRBc3NpZ25tZW50LmphdmE=) | `94.24% <92.72%> (+0.28%)` | :arrow_up: |
   | [...ntroller/helix/core/PinotHelixResourceManager.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL1Bpbm90SGVsaXhSZXNvdXJjZU1hbmFnZXIuamF2YQ==) | `64.38% <100.00%> (-3.23%)` | :arrow_down: |
   | [...e/assignment/segment/SegmentAssignmentFactory.java](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL2Fzc2lnbm1lbnQvc2VnbWVudC9TZWdtZW50QXNzaWdubWVudEZhY3RvcnkuamF2YQ==) | `100.00% <100.00%> (ø)` | |
   | ... and [391 more](https://codecov.io/gh/apache/pinot/pull/8584/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8584?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8584?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f200ec5...8c0cf76](https://codecov.io/gh/apache/pinot/pull/8584?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r874240901


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2048,6 +2034,39 @@ public void assignTableSegment(String tableNameWithType, String segmentName) {
     }
   }
 
+  private Map<InstancePartitionsType, InstancePartitions> fetchOrComputeInstancePartitions(String tableNameWithType,
+      TableConfig tableConfig) {
+    boolean isOfflineTable = !TableNameBuilder.isRealtimeTableResource(tableNameWithType);
+    boolean isUpsertTable = !isOfflineTable && tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE;
+    InstancePartitionsType instancePartitionsType = null;
+    if (isOfflineTable) {
+      instancePartitionsType = InstancePartitionsType.OFFLINE;
+    } else if (isUpsertTable) {
+      // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
+      // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
+      instancePartitionsType = InstancePartitionsType.CONSUMING;
+    }
+    if (isOfflineTable || isUpsertTable) {
+      return Collections.singletonMap(instancePartitionsType, InstancePartitionsUtils
+          .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, instancePartitionsType));
+    }

Review Comment:
   (minor) slightly more readable to me
   ```suggestion
       if (TableNameBuilder.isOfflineTableResource(tableNameWithType) {
         return Collections.singletonMap(InstancePartitionsType.OFFLINE, InstancePartitionsUtils
             .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.OFFLINE));
       }
       if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
         // In an upsert enabled LLC realtime table, all segments of the same partition are collocated on the same server
         // -- consuming or completed. So it is fine to use CONSUMING as the InstancePartitionsType.
         return Collections.singletonMap(InstancePartitionsType.CONSUMING, InstancePartitionsUtils
             .fetchOrComputeInstancePartitions(_helixZkManager, tableConfig, InstancePartitionsType.CONSUMING));
       }
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -369,8 +369,10 @@ public void addSegment(ImmutableSegment immutableSegment) {
 
   private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
     String segmentName = immutableSegment.getSegmentName();
-    int partitionGroupId = SegmentUtils
+    Integer partitionGroupId = SegmentUtils
         .getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
+    Preconditions.checkNotNull(
+        String.format("PartitionGroupId is not available for segment '%s' (upsert-enabled table)", segmentName));

Review Comment:
   (minor) Also include the table name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi merged pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
sajjad-moradi merged PR #8584:
URL: https://github.com/apache/pinot/pull/8584


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] jtao15 commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
jtao15 commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r861386703


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -368,12 +364,19 @@ private List<String> assignCompletedSegment(String segmentName, Map<String, Map<
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
-      return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionGroupId);
+      int partitionId = getPartitionGroupId(segmentName) % instancePartitions.getNumPartitions();
+      return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
+    }
+  }
+
+  private int getPartitionGroupId(String segmentName) {
+    Integer segmentPartitionId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
+    if (segmentPartitionId == null) {
+      // This case is for the uploaded segments for which there's no partition information
+      // Choose a random, but consistent, partition id
+      segmentPartitionId = Math.abs(segmentName.hashCode());

Review Comment:
   Can `segmentName.hashCode() = Integer.MIN_VALUE`? If so, the `segmentPartitionId` can be a negative number, is this expected?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -158,7 +153,8 @@ private List<String> assignConsumingSegment(String segmentName, InstancePartitio
       int numInstances = instances.size();
       List<String> instancesAssigned = new ArrayList<>(_replication);
       for (int replicaId = 0; replicaId < _replication; replicaId++) {
-        instancesAssigned.add(instances.get((partitionGroupId * _replication + replicaId) % numInstances));
+        int instanceIndex = Math.abs(partitionGroupId * _replication + replicaId) % numInstances;

Review Comment:
   Does `getPartitionGroupId()` aim to return a non-negative value? If so, we don't need Math.abs() here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] jackjlli commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
jackjlli commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r858106591


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java:
##########
@@ -103,6 +112,84 @@ protected void overrideServerConf(PinotConfiguration configuration) {
     }
   }
 
+  @Override
+  protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, TableConfig tableConfig)
+      throws Exception {
+    if (!_tarDir.exists()) {
+      _tarDir.mkdir();
+    }
+    if (!_segmentDir.exists()) {
+      _segmentDir.mkdir();
+    }
+
+    // create segments out of the avro files (segments will be placed in _tarDir)
+    List<File> copyOfAvroFiles = new ArrayList<>(avroFiles);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(copyOfAvroFiles, tableConfig, schema, 0, _segmentDir, _tarDir);
+
+    // upload segments to controller
+    uploadSegmentsToController(getTableName(), _tarDir, false, false);
+
+    // upload the first segment again to verify refresh
+    uploadSegmentsToController(getTableName(), _tarDir, true, false);
+
+    // upload the first segment again to verify refresh with different segment crc
+    uploadSegmentsToController(getTableName(), _tarDir, true, true);
+
+    // add avro files to the original list so H2 will have the uploaded data as well
+    avroFiles.addAll(copyOfAvroFiles);
+  }
+
+  private void uploadSegmentsToController(String tableName, File tarDir, boolean onlyFirstSegment, boolean changeCrc)
+      throws Exception {
+    File[] segmentTarFiles = tarDir.listFiles();
+    assertNotNull(segmentTarFiles);
+    int numSegments = segmentTarFiles.length;
+    assertTrue(numSegments > 0);
+    if (onlyFirstSegment) {
+      numSegments = 1;
+    }
+    URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      if (numSegments == 1) {
+        File segmentTarFile = segmentTarFiles[0];
+        if (changeCrc) {
+          changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString());
+        }
+        assertEquals(fileUploadDownloadClient
+            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName,
+                TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK);
+      } else {
+        // Upload segments in parallel
+        ExecutorService executorService = Executors.newFixedThreadPool(numSegments);
+        List<Future<Integer>> futures = new ArrayList<>(numSegments);
+        for (File segmentTarFile : segmentTarFiles) {
+          futures.add(executorService.submit(() -> fileUploadDownloadClient
+              .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName,
+                  TableType.REALTIME).getStatusCode()));
+        }
+        executorService.shutdown();
+        for (Future<Integer> future : futures) {
+          assertEquals((int) future.get(), HttpStatus.SC_OK);
+        }
+      }
+    }
+  }
+
+  private void changeCrcInSegmentZKMetadata(String tableName, String segmentFilePath) {
+    int startIdx = segmentFilePath.indexOf("mytable_");
+    int endIdx = segmentFilePath.indexOf(".tar.gz");
+    String segmentName = segmentFilePath.substring(startIdx, endIdx);
+    String tableNameWithType = TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(tableName);
+    SegmentZKMetadata segmentZKMetadata = _helixResourceManager.getSegmentZKMetadata(tableNameWithType, segmentName);
+    segmentZKMetadata.setCrc(111L);
+    _helixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata);
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    return super.getCountStarResult() * 2;

Review Comment:
   Probably need some comments on why the count star result needs to be double?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r862273056


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -368,12 +364,19 @@ private List<String> assignCompletedSegment(String segmentName, Map<String, Map<
       // Replica-group based assignment
 
       // Uniformly spray the segment partitions over the instance partitions
-      int segmentPartitionId =
-          SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
-      int numPartitions = instancePartitions.getNumPartitions();
-      int partitionGroupId = segmentPartitionId % numPartitions;
-      return SegmentAssignmentUtils
-          .assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionGroupId);
+      int partitionId = getPartitionGroupId(segmentName) % instancePartitions.getNumPartitions();
+      return SegmentAssignmentUtils.assignSegmentWithReplicaGroup(currentAssignment, instancePartitions, partitionId);
+    }
+  }
+
+  private int getPartitionGroupId(String segmentName) {
+    Integer segmentPartitionId =
+        SegmentUtils.getRealtimeSegmentPartitionId(segmentName, _realtimeTableName, _helixManager, _partitionColumn);
+    if (segmentPartitionId == null) {
+      // This case is for the uploaded segments for which there's no partition information
+      // Choose a random, but consistent, partition id
+      segmentPartitionId = Math.abs(segmentName.hashCode());

Review Comment:
   +1. We may bound the partition group id to some large number (e.g. `Math.abs(segmentName.hashCode() % 10000)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on a diff in pull request #8584: Enable uploading segments to realtime tables

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r862282454


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -158,7 +153,8 @@ private List<String> assignConsumingSegment(String segmentName, InstancePartitio
       int numInstances = instances.size();
       List<String> instancesAssigned = new ArrayList<>(_replication);
       for (int replicaId = 0; replicaId < _replication; replicaId++) {
-        instancesAssigned.add(instances.get((partitionGroupId * _replication + replicaId) % numInstances));
+        int instanceIndex = Math.abs(partitionGroupId * _replication + replicaId) % numInstances;

Review Comment:
   If partitionGroupId is a large number, `partitionGroupId * _replication + replicaId` will roll over to negative numbers. That's why I put the Math.abs() around it. Having said that, if we bound the partitionGroupId to a smaller number - like % 1000 that Jackie is suggesting - we can remove the Math.abs() here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org