You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:24:12 UTC

[GitHub] [incubator-pinot] chenboat commented on a change in pull request #6899: Add partial upsert config and mergers

chenboat commented on a change in pull request #6899:
URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r654067697



##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +63,170 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.

Review comment:
       s/upset/upsert/

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handler for partial-upsert.
+ */
+public class PartialUpsertHandler {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PartialUpsertHandler.class);
+
+  private final Map<String, PartialUpsertMerger> _mergers = new HashMap<>();
+
+  private final HelixManager _helixManager;
+  private final String _tableNameWithType;
+  private boolean _allSegmentsLoaded;
+
+  public PartialUpsertHandler(HelixManager helixManager, String tableNameWithType,
+      Map<String, UpsertConfig.Strategy> partialUpsertStrategies) {
+    _helixManager = helixManager;
+    _tableNameWithType = tableNameWithType;
+    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
+      _mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
+    }
+  }
+
+  /**
+   * Returns {@code true} if all segments assigned to the current instance are loaded, {@code false} otherwise.
+   * Consuming segment should perform this check to ensure all previous records are loaded before inserting new records.
+   */
+  public synchronized boolean isAllSegmentsLoaded() {
+    if (_allSegmentsLoaded) {
+      return true;
+    }
+
+    HelixDataAccessor dataAccessor = _helixManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+    IdealState idealState = dataAccessor.getProperty(keyBuilder.idealStates(_tableNameWithType));
+    if (idealState == null) {
+      LOGGER.warn("Failed to find ideal state for table: {}", _tableNameWithType);
+      return false;
+    }
+    String instanceName = _helixManager.getInstanceName();
+    LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
+    if (liveInstance == null) {
+      LOGGER.warn("Failed to find live instance for instance: {}", instanceName);
+      return false;
+    }
+    String sessionId = liveInstance.getEphemeralOwner();
+    CurrentState currentState =
+        dataAccessor.getProperty(keyBuilder.currentState(instanceName, sessionId, _tableNameWithType));
+    if (currentState == null) {
+      LOGGER.warn("Failed to find current state for instance: {}, sessionId: {}, table: {}", instanceName, sessionId,
+          _tableNameWithType);
+      return false;
+    }
+
+    // Check if ideal state and current state matches for all segments assigned to the current instance
+    Map<String, Map<String, String>> idealStatesMap = idealState.getRecord().getMapFields();
+    Map<String, String> currentStateMap = currentState.getPartitionStateMap();
+    for (Map.Entry<String, Map<String, String>> entry : idealStatesMap.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+      String expectedState = instanceStateMap.get(instanceName);
+      // Only track ONLINE segments assigned to the current instance
+      if (!SegmentStateModel.ONLINE.equals(expectedState)) {
+        continue;
+      }
+      String actualState = currentStateMap.get(segmentName);
+      if (!SegmentStateModel.ONLINE.equals(actualState)) {
+        if (SegmentStateModel.ERROR.equals(actualState)) {
+          LOGGER
+              .error("Find ERROR segment: {}, table: {}, expected: {}", segmentName, _tableNameWithType, expectedState);
+        } else {
+          LOGGER.info("Find unloaded segment: {}, table: {}, expected: {}, actual: {}", segmentName, _tableNameWithType,
+              expectedState, actualState);
+        }
+        return false;
+      }
+    }
+
+    LOGGER.info("All segments loaded for table: {}", _tableNameWithType);
+    _allSegmentsLoaded = true;
+    return true;
+  }
+
+  /**
+   * Merges 2 records and returns the merged record.
+   *
+   * @param previousRecord the last derived full record during ingestion.
+   * @param newRecord the new consumed record.
+   * @return a new row after merge
+   */
+  public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
+    for (Map.Entry<String, PartialUpsertMerger> entry : _mergers.entrySet()) {
+      String column = entry.getKey();
+      if (!previousRecord.isNullValue(column)) {

Review comment:
       should it be part of a merge strategy to deal with Null value handling in two records? Why this logic appear here rather than delegating to each mergeStrategy?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +63,170 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.

Review comment:
       which index? can you be more specific?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
##########
@@ -60,128 +63,170 @@
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
+  private final PartialUpsertHandler _partialUpsertHandler;
 
-  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics) {
+  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
+
+  // Reused for reading previous record during partial upsert
+  private final GenericRow _reuse = new GenericRow();
+  // Stores the result of updateRecord()
+  private GenericRow _result;
+
+  public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics,
+      @Nullable PartialUpsertHandler partialUpsertHandler) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
     _serverMetrics = serverMetrics;
+    _partialUpsertHandler = partialUpsertHandler;
   }
 
-  public ConcurrentHashMap<PrimaryKey, RecordLocation> getPrimaryKeyToRecordLocationMap() {
-    return _primaryKeyToRecordLocationMap;
-  }
-
-  // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
-  @VisibleForTesting
-  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
-
   /**
-   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
+   * Initializes the upsert metadata for the given immutable segment.
    */
-  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+  public void addSegment(IndexSegment segment, Iterator<RecordInfo> recordInfoIterator) {
+    String segmentName = segment.getSegmentName();
     LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+    ThreadSafeMutableRoaringBitmap validDocIds = segment.getValidDocIds();
+    assert validDocIds != null;
 
-    ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
     while (recordInfoIterator.hasNext()) {
       RecordInfo recordInfo = recordInfoIterator.next();
       _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
         if (currentRecordLocation != null) {
           // Existing primary key
 
-          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
-            // The current record location has the same segment name
-
-            // Update the record location when the new timestamp is greater than or equal to the current timestamp.
-            // There are 2 scenarios:
-            //   1. The current record location is pointing to the same segment (the segment being added). In this case,
-            //      we want to update the record location when there is a tie to keep the newer record. Note that the
-            //      record info iterator will return records with incremental doc ids.
-            //   2. The current record location is pointing to the old segment being replaced. This could happen when
-            //      committing a consuming segment, or reloading a completed segment. In this case, we want to update
-            //      the record location when there is a tie because the record locations should point to the new added
-            //      segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
-            //      segment because it has not been replaced yet.
+          // The current record is in the same segment
+          // Update the record location when there is a tie to keep the newer record. Note that the record info iterator
+          // will return records with incremental doc ids.
+          IndexSegment currentSegment = currentRecordLocation.getSegment();
+          if (segment == currentSegment) {
             if (recordInfo._timestamp >= currentRecordLocation.getTimestamp()) {
-              // Only update the valid doc ids for the new segment
-              if (validDocIds == currentRecordLocation.getValidDocIds()) {
-                validDocIds.remove(currentRecordLocation.getDocId());
-              }
+              validDocIds.remove(currentRecordLocation.getDocId());
               validDocIds.add(recordInfo._docId);
-              return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              return new RecordLocation(segment, recordInfo._docId, recordInfo._timestamp);
             } else {
               return currentRecordLocation;
             }
-          } else {
-            // The current record location is pointing to a different segment
-
-            // Update the record location when getting a newer timestamp, or the timestamp is the same as the current
-            // timestamp, but the segment has a larger sequence number (the segment is newer than the current segment).
-            if (recordInfo._timestamp > currentRecordLocation.getTimestamp() || (
-                recordInfo._timestamp == currentRecordLocation.getTimestamp()
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)
-                    && LLCSegmentName.isLowLevelConsumerSegmentName(currentRecordLocation.getSegmentName())
-                    && LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName
-                    .getSequenceNumber(currentRecordLocation.getSegmentName()))) {
-              currentRecordLocation.getValidDocIds().remove(currentRecordLocation.getDocId());
+          }
+
+          // The current record is in an old segment being replaced

Review comment:
       What do you mean a segment being replaced? is it being deleted due to retention reasons?

##########
File path: pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java
##########
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert.merger;
+
+public interface PartialUpsertMerger {
+  /**
+   * Handle partial upsert merge.
+   *
+   * @param previousValue the last derived full record during ingestion.
+   * @param currentValue the new consumed record.

Review comment:
       Does the current record need to be full? In general, I do not understand why we should even have this constraint that either the previous or currentValue should be a full record or not. A merger should take care of all cases.

##########
File path: pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManagerTest.java
##########
@@ -44,36 +46,36 @@
   @Test
   public void testAddSegment() {
     PartitionUpsertMetadataManager upsertMetadataManager =
-        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Mockito.mock(ServerMetrics.class));
-    Map<PrimaryKey, RecordLocation> recordLocationMap = upsertMetadataManager.getPrimaryKeyToRecordLocationMap();
+        new PartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, mock(ServerMetrics.class), null);

Review comment:
       Can you add tests in this class about PartialUpdate?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org