You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/10/21 18:08:05 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6167: Support reloading upsert table

Jackie-Jiang commented on a change in pull request #6167:
URL: https://github.com/apache/incubator-pinot/pull/6167#discussion_r509535850



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/upsert/PartitionUpsertMetadataManager.java
##########
@@ -29,97 +30,142 @@
 
 /**
  * Manages the upsert metadata per partition.
+ * <p>For multiple records with the same timestamp, there is no guarantee on which record to be preserved.
+ * <p>There will be short term inconsistency when updating the upsert metadata, but should be consistent after the
+ * operation is done:
+ * <ul>
+ *   <li>
+ *     When updating a new record, it first removes the doc id from the current location, then update the new location.
+ *   </li>
+ *   <li>
+ *     When adding a new segment, it removes the doc ids from the current locations before the segment being added to
+ *     the RealtimeTableDataManager.
+ *   </li>
+ *   <li>
+ *     When replacing an existing segment, the updates applied to the new segment won't be reflected to the replaced
+ *     segment.
+ *   </li>
+ * </ul>
  */
 @ThreadSafe
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
   // TODO(upset): consider an off-heap KV store to persist this index to improve the recovery speed.
-  private final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap =
-      new ConcurrentHashMap<>();
-  // the mapping between the (sealed) segment and its validDocuments
-  private final ConcurrentHashMap<String, ThreadSafeMutableRoaringBitmap> _segmentToValidDocIdsMap =
-      new ConcurrentHashMap<>();
+  @VisibleForTesting
+  final ConcurrentHashMap<PrimaryKey, RecordLocation> _primaryKeyToRecordLocationMap = new ConcurrentHashMap<>();
 
   /**
-   * Creates the valid doc ids for the given (immutable) segment.
+   * Initializes the upsert metadata for the given immutable segment, returns the valid doc ids for the segment.
    */
-  public ThreadSafeMutableRoaringBitmap createValidDocIds(String segmentName) {
-    LOGGER.info("Creating valid doc ids for segment: {}", segmentName);
+  public ThreadSafeMutableRoaringBitmap addSegment(String segmentName, Iterator<RecordInfo> recordInfoIterator) {
+    LOGGER.info("Adding upsert metadata for segment: {}", segmentName);
+
     ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap();
-    if (_segmentToValidDocIdsMap.put(segmentName, validDocIds) != null) {
-      LOGGER.warn("Valid doc ids exist for segment: {}, replacing it", segmentName);
+    while (recordInfoIterator.hasNext()) {
+      RecordInfo recordInfo = recordInfoIterator.next();
+      _primaryKeyToRecordLocationMap.compute(recordInfo._primaryKey, (primaryKey, currentRecordLocation) -> {
+        if (currentRecordLocation != null) {
+          // Existing primary key
+
+          if (segmentName.equals(currentRecordLocation.getSegmentName())) {
+            // The current record location has the same segment name
+
+            if (validDocIds == currentRecordLocation.getValidDocIds()) {
+              // The current record location is pointing to the new segment being loaded
+
+              // Update the record location when getting a newer timestamp
+              if (recordInfo._timestamp > currentRecordLocation.getTimestamp()) {
+                validDocIds.remove(currentRecordLocation.getDocId());
+                validDocIds.add(recordInfo._docId);
+                return new RecordLocation(segmentName, recordInfo._docId, recordInfo._timestamp, validDocIds);
+              }
+            } else {
+              // The current record location is pointing to the old segment being replaced. This could happen when
+              // committing a consuming segment, or reloading a completed segment.
+
+              // Update the record location when the new timestamp is greater than or equal to the current timestamp.
+              // Update the record location when there is a tie because the record locations should point to the new
+              // segment instead of the old segment being replaced. Also, do not update the valid doc ids for the old
+              // segment because it has not been replaced yet.

Review comment:
       When we commit a consuming segment, or reload a completed segment, the data will be identical (could be re-ordered). If we update the valid doc, before we replace the old segment in the data manager, all the docs in the old segment will be invalidated. Even though it can recover after the segment is replaced, we will observe data loss before that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org