You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/10/12 22:30:18 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6113: Adding the upsert support to real-time ingestion and query

mcvsubbu commented on a change in pull request #6113:
URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r503515858



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -132,6 +146,19 @@ protected void doInit() {
     String consumerDirPath = getConsumerDir();
     File consumerDir = new File(consumerDirPath);
 
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);

Review comment:
       tableconfig is available at the time TableDataManager is created. Can you use that instead of fetching it again here?
   Also, TableDataManager is not recreaated if table config changes. How do you plan to address tha?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
##########
@@ -1165,6 +1172,14 @@ public LLRealtimeSegmentDataManager(RealtimeSegmentZKMetadata segmentZKMetadata,
     Set<String> textIndexColumns = indexLoadingConfig.getTextIndexColumns();
     _textIndexColumns = new ArrayList<>(textIndexColumns);
 
+    PartitionUpsertMetadataManager partitionUpsertMetadataManager = null;
+    UpsertConfig.Mode upsertMode = _tableConfig.getUpsertMode();
+    if (_upsertMetadataTableManager != null && upsertMode != UpsertConfig.Mode.NONE) {
+      int partitionId = new LLCSegmentName(_segmentNameStr).getPartitionId();

Review comment:
       Please ise `_streamPartitionId` member variable

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -453,6 +465,49 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) {
     return canTakeMore;
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && _upsertMode != UpsertConfig.Mode.NONE;
+  }
+
+  private void handleUpsert(GenericRow row, int docId) {
+    // below are upsert operations
+    PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    Object timeValue = row.getValue(_timeColumnName);
+    Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable");
+    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+    RecordLocation location = new RecordLocation(_segmentName, docId, timestamp);
+    // check local primary key index first
+    if (_primaryKeyIndex.containsKey(primaryKey)) {
+      RecordLocation prevLocation = _primaryKeyIndex.get(primaryKey);
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        _primaryKeyIndex.put(primaryKey, location);
+        // update validDocIndex
+        _validDocIndex.remove(prevLocation.getDocId());
+        _validDocIndex.checkAndAdd(location.getDocId());
+        LOGGER.debug(String
+            .format("upsert: replace old doc id %d with %d for key: %s, hash: %d", prevLocation.getDocId(),
+                location.getDocId(), primaryKey, primaryKey.hashCode()));
+      } else {
+        LOGGER.debug(
+            String.format("upsert: ignore a late-arrived record: %s, hash: %d", primaryKey, primaryKey.hashCode()));
+      }
+    } else if (_partitionUpsertMetadataManager.containsKey(primaryKey)) {
+      RecordLocation prevLocation = _partitionUpsertMetadataManager.getRecordLocation(primaryKey);
+      if (location.getTimestamp() >= prevLocation.getTimestamp()) {
+        _partitionUpsertMetadataManager.removeRecordLocation(primaryKey);
+        _primaryKeyIndex.put(primaryKey, location);
+
+        // update validDocIndex
+        _partitionUpsertMetadataManager.getValidDocIndex(prevLocation.getSegmentName())
+            .remove(prevLocation.getDocId());
+        _validDocIndex.checkAndAdd(location.getDocId());
+      }
+    } else {
+      _primaryKeyIndex.put(primaryKey, location);
+      _validDocIndex.checkAndAdd(location.getDocId());
+    }

Review comment:
       +1

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -266,14 +292,52 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
         manager =
             new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
                 indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(streamPartitionId),
-                _serverMetrics);
+                _serverMetrics, _tableUpsertMetadataManager);
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
       _segmentDataManagerMap.put(segmentName, manager);
       _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
     }
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode == UpsertConfig.Mode.FULL || _upsertMode == UpsertConfig.Mode.PARTIAL;
+  }
+
+  @Override
+  public void addSegment(ImmutableSegment immutableSegment) {
+    if (isUpsertEnabled()) {
+      handleUpsert(immutableSegment);
+    }
+    super.addSegment(immutableSegment);
+  }
+
+  private void handleUpsert(ImmutableSegment immutableSegment) {

Review comment:
       Can we move the upsert handling to a different class? Perhaps a subclass of RealtimeTableDataManager?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org