You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/06/17 00:36:37 UTC

[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10703: Add delete support to upsert tables

Jackie-Jiang commented on code in PR #10703:
URL: https://github.com/apache/pinot/pull/10703#discussion_r1232890455


##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -409,7 +416,62 @@ protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primar
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
+        .setUpsertConfig(upsertConfig).build();
+  }
+
+

Review Comment:
   (nit) extra empty line



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -409,7 +416,62 @@ protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primar
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
+        .setUpsertConfig(upsertConfig).build();
+  }
+
+
+  protected Map<String, String> getCSVStreamConfigMap(@Nullable String delimiter, @Nullable String csvHeaderProperty) {
+    String streamType = "kafka";
+    Map<String, String> streamConfigsMap = new HashMap<>();
+    streamConfigsMap.put(
+        StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+        CSVMessageDecoder.class.getName());
+    if (delimiter != null) {
+      streamConfigsMap.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.delimiter"),
+          delimiter);
+    }
+    if (csvHeaderProperty != null) {
+      streamConfigsMap.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.header"),
+          csvHeaderProperty);
+    }
+    return streamConfigsMap;
+  }
+
+  /**
+   * Creates a new Upsert enabled table config.
+   */
+  protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String schemaName,

Review Comment:
   Remove the redundant arguments and use `getTableName()`, `getSchemaName()` etc.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java:
##########
@@ -388,6 +388,12 @@ public List<String> getUpsertComparisonColumns() {
     return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns();
   }
 
+  @JsonIgnore
+  @Nullable
+  public String getUpsertDeleteRecordColumn() {

Review Comment:
   Let's make sure the naming is consistent (either use `deleteRecordColumn` or `deletedRecordColumn`). I personally prefer `deleteRecordColumn` because it is used to delete a record



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -498,33 +560,82 @@ protected void setUpQueryGenerator(List<File> avroFiles) {
     _queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
   }
 
+  protected List<File> unpackAvroData(File outputDir)
+      throws Exception {
+    return unpackTarData(getAvroTarFileName(), outputDir);
+  }
+
   /**
-   * Unpack the tarred Avro data into the given directory.
+   * Unpack the tarred data into the given directory.
    *
+   * @param tarFileName Input tar filename
    * @param outputDir Output directory
    * @return List of files unpacked.
    * @throws Exception
    */
-  protected List<File> unpackAvroData(File outputDir)
+  protected List<File> unpackTarData(String tarFileName, File outputDir)
       throws Exception {
     InputStream inputStream =
-        BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(getAvroTarFileName());
+        BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
     Assert.assertNotNull(inputStream);
     return TarGzCompressionUtils.untar(inputStream, outputDir);
   }
-
   /**
    * Pushes the data in the given Avro files into a Kafka stream.
    *
    * @param avroFiles List of Avro files
    */
   protected void pushAvroIntoKafka(List<File> avroFiles)
       throws Exception {
-
     ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, "localhost:" + getKafkaPort(), getKafkaTopic(),
         getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
   }
 
+  /**
+   * Pushes the data in the given Avro files into a Kafka stream.
+   *
+   * @param csvFile List of CSV strings
+   */
+  protected void pushCsvIntoKafka(File csvFile, String kafkaTopic, @Nullable Integer partitionColumnIndex) {
+    String kafkaBroker = "localhost:" + getKafkaPort();
+    StreamDataProducer producer = null;
+    try {
+      producer =
+        StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+            getDefaultKafkaProducerProperties(kafkaBroker));
+      ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones(),
+          producer);
+    } catch (Exception e) {

Review Comment:
   Suggest throwing the exception out. Currently it will fail silently. Same for the other method



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java:
##########
@@ -74,7 +74,7 @@ public void setUp()
     // Create and upload schema and table config
     Schema schema = createSchema();
     addSchema(schema);
-    TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions());
+    TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());

Review Comment:
   Consider merging this into the `UpsertTableIntegrationTest`? Essentially we want to upload one segment and check if it works properly



##########
pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java:
##########
@@ -409,7 +416,62 @@ protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primar
         .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
         .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
-        .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
+        .setUpsertConfig(upsertConfig).build();
+  }
+
+
+  protected Map<String, String> getCSVStreamConfigMap(@Nullable String delimiter, @Nullable String csvHeaderProperty) {

Review Comment:
   This is not really the stream config. Suggest merging this into `createCSVUpsertTableConfig()`



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -82,7 +82,7 @@ public void testHardcodedQueriesMultiStage()
     super.testHardcodedQueriesMultiStage();
   }
 
-  @Test
+  @Test (enabled = false)

Review Comment:
   Remove this



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -220,11 +255,19 @@ protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
   protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {

Review Comment:
   Nice job handling the previous record deleted.
   
   When the new record is a delete record, we can skip reading the previous record and updating the new record. We do want to still handle the out-of-order case. We can consider adding a flag for `outOfOrderEvent` to replace the current `previousRecordDeleted` flag



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org