You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/10/07 06:00:21 UTC

[GitHub] [incubator-pinot] jamesyfshao commented on a change in pull request #6113: Adding the upsert support to real-time ingestion and query

jamesyfshao commented on a change in pull request #6113:
URL: https://github.com/apache/incubator-pinot/pull/6113#discussion_r500755582



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
##########
@@ -266,14 +291,59 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
         manager =
             new LLRealtimeSegmentDataManager(realtimeSegmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
                 indexLoadingConfig, schema, llcSegmentName, _partitionIdToSemaphoreMap.get(streamPartitionId),
-                _serverMetrics);
+                _serverMetrics, _upsertMetadataTableManager);
       }
       _logger.info("Initialize RealtimeSegmentDataManager - " + segmentName);
       _segmentDataManagerMap.put(segmentName, manager);
       _serverMetrics.addValueToTableGauge(_tableNameWithType, ServerGauge.SEGMENT_COUNT, 1L);
     }
   }
 
+  private boolean isUpsertEnabled() {
+    return _upsertMode != null && _upsertMode == UpsertConfig.Mode.FULL || _upsertMode == UpsertConfig.Mode.PARTIAL;
+  }
+
+  @Override
+  public void addSegment(ImmutableSegment immutableSegment) {
+    if(isUpsertEnabled()) {
+      handleUpsert(immutableSegment);
+    }
+    super.addSegment(immutableSegment);
+  }
+
+  private void handleUpsert(ImmutableSegment immutableSegment) {
+    Preconditions.checkArgument(!_primaryKeyColumns.isEmpty(), "the primary key columns cannot be empty");
+    Map<String, PinotSegmentColumnReader> columnToReaderMap = new HashMap<>();
+    for (String primaryKeyColumn : _primaryKeyColumns) {
+      columnToReaderMap.put(primaryKeyColumn, new PinotSegmentColumnReader(immutableSegment, primaryKeyColumn));
+    }
+    columnToReaderMap.put(_timeColumnName, new PinotSegmentColumnReader(immutableSegment, _timeColumnName));
+    int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
+
+    // upsert metadata of the current segment
+    Map<PrimaryKey, RecordLocation> primaryKeyIndex = new HashMap<>();

Review comment:
       What's the estimated memory usage for the extra data structure? if the primary key+RecordLocation is around 1K, this means we will use 10G heap for this data structure for 1M records. We might want to be careful about part of memory usage




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org