You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/02/28 22:31:08 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8255: Added multi column partitioning for offline table

Jackie-Jiang commented on a change in pull request #8255:
URL: https://github.com/apache/pinot/pull/8255#discussion_r816292571



##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -54,18 +57,19 @@
  */
 public class PartitionSegmentPruner implements SegmentPruner {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionSegmentPruner.class);
-  private static final PartitionInfo INVALID_PARTITION_INFO = new PartitionInfo(null, null);
+  private static final Map<String, PartitionInfo> INVALID_COLUMN_PARTITION_INFO =
+      Collections.singletonMap("null", new PartitionInfo(null, null));

Review comment:
       (minor)
   ```suggestion
     private static final Map<String, PartitionInfo> INVALID_COLUMN_PARTITION_INFO_MAP = Collections.emptyMap();
   ```

##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -239,25 +260,29 @@ private boolean isPartitionMatch(Expression filterExpression, PartitionInfo part
   }
 
   @Deprecated
-  private boolean isPartitionMatch(FilterQueryTree filterQueryTree, PartitionInfo partitionInfo) {
+  private boolean isPartitionMatch(FilterQueryTree filterQueryTree, Map<String, PartitionInfo> columnPartitionInfo) {
     switch (filterQueryTree.getOperator()) {
       case AND:
         for (FilterQueryTree child : filterQueryTree.getChildren()) {
-          if (!isPartitionMatch(child, partitionInfo)) {
+          if (!isPartitionMatch(child, columnPartitionInfo)) {
             return false;
           }
         }
         return true;
       case OR:
         for (FilterQueryTree child : filterQueryTree.getChildren()) {
-          if (isPartitionMatch(child, partitionInfo)) {
+          if (isPartitionMatch(child, columnPartitionInfo)) {
             return true;
           }
         }
         return false;
       case EQUALITY:
       case IN:
-        if (filterQueryTree.getColumn().equals(_partitionColumn)) {
+        if (_partitionColumns.contains(filterQueryTree.getColumn())) {

Review comment:
       This check is redundant

##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -83,20 +87,22 @@ public void init(IdealState idealState, ExternalView externalView, Set<String> o
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
-      if (partitionInfo != null) {
-        _partitionInfoMap.put(segment, partitionInfo);
+      Map<String, PartitionInfo> columnPartitionInfo =

Review comment:
       (minor) rename it to `columnPartitionInfoMap`

##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -83,20 +87,22 @@ public void init(IdealState idealState, ExternalView externalView, Set<String> o
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      PartitionInfo partitionInfo = extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
-      if (partitionInfo != null) {
-        _partitionInfoMap.put(segment, partitionInfo);
+      Map<String, PartitionInfo> columnPartitionInfo =
+          extractPartitionInfoFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
+      if (columnPartitionInfo != null) {
+        _segmentColumnPartitionInfoMap.put(segment, columnPartitionInfo);
       }
     }
   }
 
   /**
    * NOTE: Returns {@code null} when the ZNRecord is missing (could be transient Helix issue). Returns
-   *       {@link #INVALID_PARTITION_INFO} when the segment does not have valid partition metadata in its ZK metadata,
-   *       in which case we won't retry later.
+   *       {@link #INVALID_COLUMN_PARTITION_INFO} when the segment does not have valid partition metadata in its ZK
+   *       metadata, in which case we won't retry later.
    */
   @Nullable
-  private PartitionInfo extractPartitionInfoFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) {
+  private Map<String, PartitionInfo> extractPartitionInfoFromSegmentZKMetadataZNRecord(String segment,

Review comment:
       (minor)
   ```suggestion
     private Map<String, PartitionInfo> extractColumnPartitionInfoMapFromSegmentZKMetadataZNRecord(String segment,
   ```

##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -114,20 +120,25 @@ private PartitionInfo extractPartitionInfoFromSegmentZKMetadataZNRecord(String s
     } catch (Exception e) {
       LOGGER.warn("Caught exception while extracting segment partition metadata for segment: {}, table: {}", segment,
           _tableNameWithType, e);
-      return INVALID_PARTITION_INFO;
+      return INVALID_COLUMN_PARTITION_INFO;
     }
 
-    ColumnPartitionMetadata columnPartitionMetadata =
-        segmentPartitionMetadata.getColumnPartitionMap().get(_partitionColumn);
-    if (columnPartitionMetadata == null) {
-      LOGGER.warn("Failed to find column partition metadata for column: {}, segment: {}, table: {}", _partitionColumn,
-          segment, _tableNameWithType);
-      return INVALID_PARTITION_INFO;
+    Map<String, PartitionInfo> columnPartitionInfo = new HashMap<>();

Review comment:
       Suggest returning a singleton map to accelerate the map lookup when there is only one entry in the map

##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -165,9 +176,9 @@ public synchronized void refreshSegment(String segment) {
       }
       Set<String> selectedSegments = new HashSet<>();
       for (String segment : segments) {
-        PartitionInfo partitionInfo = _partitionInfoMap.get(segment);
-        if (partitionInfo == null || partitionInfo == INVALID_PARTITION_INFO || isPartitionMatch(filterExpression,
-            partitionInfo)) {
+        Map<String, PartitionInfo> columnPartitionInfo = _segmentColumnPartitionInfoMap.get(segment);

Review comment:
       (minor) `columnPartitionInfoMap`

##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -180,47 +191,57 @@ public synchronized void refreshSegment(String segment) {
       }
       Set<String> selectedSegments = new HashSet<>();
       for (String segment : segments) {
-        PartitionInfo partitionInfo = _partitionInfoMap.get(segment);
-        if (partitionInfo == null || partitionInfo == INVALID_PARTITION_INFO || isPartitionMatch(filterQueryTree,
-            partitionInfo)) {
+        Map<String, PartitionInfo> columnPartitionInfo = _segmentColumnPartitionInfoMap.get(segment);
+        if (columnPartitionInfo == null || columnPartitionInfo == INVALID_COLUMN_PARTITION_INFO || isPartitionMatch(
+            filterQueryTree, columnPartitionInfo)) {
           selectedSegments.add(segment);
         }
       }
       return selectedSegments;
     }
   }
 
-  private boolean isPartitionMatch(Expression filterExpression, PartitionInfo partitionInfo) {
+  @VisibleForTesting
+  public Set<String> getPartitionColumns() {
+    return _partitionColumns;
+  }
+
+  private boolean isPartitionMatch(Expression filterExpression, Map<String, PartitionInfo> columnPartitionInfo) {
     Function function = filterExpression.getFunctionCall();
     FilterKind filterKind = FilterKind.valueOf(function.getOperator());
     List<Expression> operands = function.getOperands();
     switch (filterKind) {
       case AND:
         for (Expression child : operands) {
-          if (!isPartitionMatch(child, partitionInfo)) {
+          if (!isPartitionMatch(child, columnPartitionInfo)) {
             return false;
           }
         }
         return true;
       case OR:
         for (Expression child : operands) {
-          if (isPartitionMatch(child, partitionInfo)) {
+          if (isPartitionMatch(child, columnPartitionInfo)) {
             return true;
           }
         }
         return false;
       case EQUALS: {
         Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_partitionColumn)) {
-          return partitionInfo._partitions.contains(
-              partitionInfo._partitionFunction.getPartition(operands.get(1).getLiteral().getFieldValue().toString()));
+        if (identifier != null && _partitionColumns.contains(identifier.getName())) {

Review comment:
       Save one extra map lookup
   ```suggestion
           if (identifier != null) {
   ```

##########
File path: pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
##########
@@ -180,47 +191,57 @@ public synchronized void refreshSegment(String segment) {
       }
       Set<String> selectedSegments = new HashSet<>();
       for (String segment : segments) {
-        PartitionInfo partitionInfo = _partitionInfoMap.get(segment);
-        if (partitionInfo == null || partitionInfo == INVALID_PARTITION_INFO || isPartitionMatch(filterQueryTree,
-            partitionInfo)) {
+        Map<String, PartitionInfo> columnPartitionInfo = _segmentColumnPartitionInfoMap.get(segment);
+        if (columnPartitionInfo == null || columnPartitionInfo == INVALID_COLUMN_PARTITION_INFO || isPartitionMatch(
+            filterQueryTree, columnPartitionInfo)) {
           selectedSegments.add(segment);
         }
       }
       return selectedSegments;
     }
   }
 
-  private boolean isPartitionMatch(Expression filterExpression, PartitionInfo partitionInfo) {
+  @VisibleForTesting
+  public Set<String> getPartitionColumns() {
+    return _partitionColumns;
+  }
+
+  private boolean isPartitionMatch(Expression filterExpression, Map<String, PartitionInfo> columnPartitionInfo) {
     Function function = filterExpression.getFunctionCall();
     FilterKind filterKind = FilterKind.valueOf(function.getOperator());
     List<Expression> operands = function.getOperands();
     switch (filterKind) {
       case AND:
         for (Expression child : operands) {
-          if (!isPartitionMatch(child, partitionInfo)) {
+          if (!isPartitionMatch(child, columnPartitionInfo)) {
             return false;
           }
         }
         return true;
       case OR:
         for (Expression child : operands) {
-          if (isPartitionMatch(child, partitionInfo)) {
+          if (isPartitionMatch(child, columnPartitionInfo)) {
             return true;
           }
         }
         return false;
       case EQUALS: {
         Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_partitionColumn)) {
-          return partitionInfo._partitions.contains(
-              partitionInfo._partitionFunction.getPartition(operands.get(1).getLiteral().getFieldValue().toString()));
+        if (identifier != null && _partitionColumns.contains(identifier.getName())) {
+          PartitionInfo partitionInfo = columnPartitionInfo.get(identifier.getName());
+          return partitionInfo == null || partitionInfo._partitions.contains(
+              partitionInfo._partitionFunction.getPartition(operands.get(1).getLiteral().getFieldValue()));
         } else {
           return true;
         }
       }
       case IN: {
         Identifier identifier = operands.get(0).getIdentifier();
-        if (identifier != null && identifier.getName().equals(_partitionColumn)) {
+        if (identifier != null && _partitionColumns.contains(identifier.getName())) {

Review comment:
       ```suggestion
           if (identifier != null) {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org