You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/05/11 20:54:51 UTC

[GitHub] [incubator-pinot] yupeng9 commented on a change in pull request #6899: Add partial upsert config and mergers (WIP)

yupeng9 commented on a change in pull request #6899:
URL: https://github.com/apache/incubator-pinot/pull/6899#discussion_r630532638



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -499,6 +515,29 @@ private boolean isUpsertEnabled() {
     return _upsertMode != UpsertConfig.Mode.NONE;
   }
 
+  private boolean isPartialUpsertEnabled() {
+    return _upsertMode == UpsertConfig.Mode.PARTIAL;
+  }
+
+  private GenericRow lookupAndMerge(GenericRow incomingRow, int docId) {
+    // get primary key and timestamp for the incoming record.
+    GenericRow previousRow = new GenericRow();
+    PrimaryKey primaryKey = incomingRow.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    Object timeValue = incomingRow.getValue(_timeColumnName);
+    Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable");
+    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+
+    // look up the previous full record with pk. Merge record if the incoming record is newer than previous record.
+    RecordLocation lastRecord = _partitionUpsertMetadataManager.findLastRecord(primaryKey);
+    if (timestamp >= lastRecord.getTimestamp()) {
+      previousRow = this.getRecord(lastRecord.getDocId(), previousRow);
+      return _partialUpsertHandler.merge(previousRow, incomingRow);

Review comment:
       this needs some error handling?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -499,6 +515,29 @@ private boolean isUpsertEnabled() {
     return _upsertMode != UpsertConfig.Mode.NONE;
   }
 
+  private boolean isPartialUpsertEnabled() {
+    return _upsertMode == UpsertConfig.Mode.PARTIAL;
+  }
+
+  private GenericRow lookupAndMerge(GenericRow incomingRow, int docId) {
+    // get primary key and timestamp for the incoming record.
+    GenericRow previousRow = new GenericRow();
+    PrimaryKey primaryKey = incomingRow.getPrimaryKey(_schema.getPrimaryKeyColumns());
+    Object timeValue = incomingRow.getValue(_timeColumnName);
+    Preconditions.checkArgument(timeValue instanceof Comparable, "time column shall be comparable");
+    long timestamp = IngestionUtils.extractTimeValue((Comparable) timeValue);
+
+    // look up the previous full record with pk. Merge record if the incoming record is newer than previous record.
+    RecordLocation lastRecord = _partitionUpsertMetadataManager.findLastRecord(primaryKey);

Review comment:
       the last record might not exist?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
##########
@@ -205,6 +214,18 @@ public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
     return _partitionUpsertMetadataManager;
   }
 
+  public UpsertConfig.STRATEGY getGlobalUpsertStrategy() {

Review comment:
       why do we need three strategies? I thought there is one strategy field, but can be of different types, such as predefined and custom in groovy

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -367,10 +372,16 @@ public long getLatestIngestionTimestamp() {
       _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
       _validDocIds = new ThreadSafeMutableRoaringBitmap();
       _validDocIndex = new ValidDocIndexReaderImpl(_validDocIds);
+
+      // init partial upsert handler with partial upsert config

Review comment:
       we init this only for partial mode




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org