You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/28 04:47:36 UTC

[GitHub] [hudi] codope commented on a change in pull request #3158: [WIP][HUDI-1860] Add INSERT_OVERWRITE support to DeltaStreamer

codope commented on a change in pull request #3158:
URL: https://github.com/apache/hudi/pull/3158#discussion_r659470200



##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1242,11 +1242,11 @@ private void verifyInsertOverwritePartitionHandling(int batch1RecordsCount, int
     List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>();
     insertsAndUpdates2.addAll(inserts2);
     JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = jsc.parallelize(insertsAndUpdates2, 2);
-    HoodieWriteResult writeResult = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
-    statuses = writeResult.getWriteStatuses().collect();
+    JavaRDD<WriteStatus> writeStatusJavaRDD = client.insertOverwrite(insertAndUpdatesRDD2, commitTime2);
+    statuses = writeStatusJavaRDD.collect();
     assertNoWriteErrors(statuses);
-
-    assertEquals(batch1Buckets, new HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
+    // todo fix
+    // assertEquals(batch1Buckets, new HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));

Review comment:
       I assume it will be fixed in this PR itself.

##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
##########
@@ -140,6 +140,19 @@ public boolean commit(String instantTime,
     return postWrite(result, instantTime, table);
   }
 
+  @Override
+  public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records, String instantTime) {

Review comment:
       Why do we need this operation support in `HoodieJavaWriteClient`? Shouldn't the `SparkRDDWriteClient` support be enough for deltastreamer?

##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
##########
@@ -260,7 +260,8 @@ private boolean onDeltaSyncShutdown(boolean error) {
     public long sourceLimit = Long.MAX_VALUE;
 
     @Parameter(names = {"--op"}, description = "Takes one of these values : UPSERT (default), INSERT (use when input "
-        + "is purely new data/inserts to gain speed)", converter = OperationConverter.class)
+        + "is purely new data/inserts to gain speed) INSERT_OVERWRITE (use when input record can overwrite existing "

Review comment:
       We should also look at `DeltaSync::writeToSink` to trigger the insert_overwrite operation.

##########
File path: hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
##########
@@ -173,9 +173,7 @@ public static HoodieWriteResult doWriteOperation(SparkRDDWriteClient client, Jav
       case UPSERT:
         return new HoodieWriteResult(client.upsert(hoodieRecords, instantTime));
       case INSERT_OVERWRITE:
-        return client.insertOverwrite(hoodieRecords, instantTime);
-      case INSERT_OVERWRITE_TABLE:

Review comment:
       Why are we removing `INSERT_OVERWRITE_TABLE` case here?

##########
File path: hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
##########
@@ -140,6 +140,19 @@ public boolean commit(String instantTime,
     return postWrite(result, instantTime, table);
   }
 
+  @Override
+  public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records, String instantTime) {
+    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
+        getTableAndInitCtx(WriteOperationType.INSERT_OVERWRITE, instantTime);
+    table.validateUpsertSchema();

Review comment:
       We should validate insert schema here right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org