You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2019/12/28 05:37:53 UTC

[GitHub] [incubator-hudi] yihua opened a new pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

yihua opened a new pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149
 
 
   ## What is the purpose of the pull request
   
   Introduce configurations and new modes of sorting for bulk_insert.
   - New configuration of sorting for bulk_insert:
     - hoodie.bulkinsert.sort.enable: whether to enable sorting during bulk_insert
     - hoodie.bulkinsert.sort.type: what type of sorting should be used, when the sorting is enabled.  Two sorting modes are implemented: global sort and local sort inside each RDD partition.
   - Two modes of sorting:
     - Global sort: sort the Hudi records across all RDD partitions for bulk_insert, by using `sortBy()` in Spark
     - Local sort inside each RDD partition: only sort the Hudi records within each RDD partition, by using `repartitionAndSortWithinPartitions()` in Spark
   
   ## Brief change log
   
   Changes the files inside `hudi-client` package (will add more details once ready).
   
   ## Verify this pull request
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362278533
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -367,20 +370,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
+      boolean isSorted, String commitTime, HoodieWriteConfig config, HoodieTable<T> hoodieTable,
+      List<String> fileIDPrefixes) {
+    if (isSorted) {
+      return new BulkInsertMapFunctionForSortedRecords(
+          commitTime, config, hoodieTable, fileIDPrefixes);
+    }
+    return new BulkInsertMapFunctionForNonSortedRecords(
+        commitTime, config, hoodieTable, fileIDPrefixes);
+  }
+
   private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime,
       HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
 
 Review comment:
   does it make sense to still have a separate `UserDefinedBulkInsertPartitioner` ? could we just combine this into `BulkInsertInternalPartitioner` ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] cdmikechen edited a comment on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
cdmikechen edited a comment on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-569556382
 
 
   I also pay more attention to this issue. I think bulk should be an efficient way to import data first time, just like
   jdbc batch insert. 
   Can we show some improved data size, data partition size, bulk cost time, sorting cost time and other information using histogram or line chart? In this way, the applicability of each modified method can be better explained.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362994779
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/NonSortPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.spark.api.java.JavaRDD;
+
+public class NonSortPartitioner<T extends HoodieRecordPayload>
 
 Review comment:
   We are using "Sort" in the other names, should we be consistent ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r371038355
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/BulkInsertInternalPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
 
 Review comment:
   @vinothchandar @n3nash @bvaradar Let's have a consensus on whether to change the `UserDefinedBulkInsertPartitioner` interface.  I'm fine with either way.  Right now I've also changed the interface so it will be a breaking change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] nsivabalan commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-667082374


   I noticed that base interface don't need to be defined as "UserDefined" anymore since its applicable to our own partitioners as well. So, have stripped off "UserDefined" from the naming. Also, have renamed args and variables to userDefinedBulkInsertPartitioner wherever applicable. Once CI succeeds, will land this PR. @yihua : thanks for helping out with this. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r374512864
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/BulkInsertInternalPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
 
 Review comment:
   I was trying to generalize the existing interface, since what we are doing here is not userdefined anyway.. 
   
   >but it might be best to avoid a breaking change since it's simple in this case ?
   
   this is also fair.. so we can leave the current interface as is and you can just extend/implement it.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] nsivabalan commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-661451741


   sure, thanks. Once done, do ping me and vinoth for review. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] cdmikechen commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
cdmikechen commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r361882256
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/RDDPartitionLocalSortPartitioner.java
 ##########
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+import scala.Tuple2;
+
+public class RDDPartitionLocalSortPartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> {
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    return records.mapToPair(record ->
+        new Tuple2<>(
+            String.format("%s+%s", record.getPartitionPath(), record.getRecordKey()), record))
+        .repartitionAndSortWithinPartitions(new HashPartitioner(outputSparkPartitions))
 
 Review comment:
   `HashPartitioner` means using record key's hashcode value to partition. Will this result in the same range of keys being assigned to different partitions? I think this may cause some problems in update.
   Maybe `RangePartitioner` is better?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r385909221
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -381,20 +384,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
 
 Review comment:
   For some subclasses in this PR, there are more complex logic in `BulkInsertMapFunction`, e.g., `BulkInsertMapFunctionForNonSortedRecords`, wondering if we should keep the API for now?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362302090
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/BulkInsertInternalPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
 
 Review comment:
   Yeah.  I was also wondering about the custom implementation of `UserDefinedBulkInsertPartitioner` in my previous comment..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] cdmikechen commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
cdmikechen commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-569556382
 
 
   I think we can show some improved data size, bulk cost time, sorting cost time and other information using histogram or line chart. In this way, the applicability of each modified method can be better explained.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362302021
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -367,20 +370,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
+      boolean isSorted, String commitTime, HoodieWriteConfig config, HoodieTable<T> hoodieTable,
+      List<String> fileIDPrefixes) {
+    if (isSorted) {
+      return new BulkInsertMapFunctionForSortedRecords(
+          commitTime, config, hoodieTable, fileIDPrefixes);
+    }
+    return new BulkInsertMapFunctionForNonSortedRecords(
+        commitTime, config, hoodieTable, fileIDPrefixes);
+  }
+
   private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime,
       HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
 
 Review comment:
   I thought `UserDefinedBulkInsertPartitioner` is user-facing so I didn't touch the interface.  Looking at the source code again, I find that `UserDefinedBulkInsertPartitioner` is only used here.  Then we can just keep a single one as `BulkInsertInternalPartitioner `.
   
   BTW, is this interface (`UserDefinedBulkInsertPartitioner`) intended for user to implement a custom partitioner?  I don't seem to see a config to pass in a custom implementation.  wondering if we should provide such flexibility. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] yihua commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r459776499



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -632,6 +647,10 @@ public FileSystemViewStorageConfig getClientSpecifiedViewStorageConfig() {
     return clientSpecifiedViewStorageConfig;
   }
 
+  public boolean getStringFormation() {
+    return Boolean.parseBoolean(props.getProperty("hoodie.tmp.string.format"));

Review comment:
       It's just for local testing to compare `String.format()` vs `StringBuilder`.  `StringBuilder` is more efficient and the reduced overhead is tangible when the number of records is huge.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> implements Serializable {
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    JavaPairRDD<String, HoodieRecord<T>> pairRDD = records.mapToPair(record ->
+        new Tuple2(
+            new StringBuilder()
+                .append(record.getPartitionPath())
+                .append("+")
+                .append(record.getRecordKey())
+                .toString(), record));
+    Ordering<String> ordering = Ordering$.MODULE$
+        .comparatorToOrdering(Comparator.<String>naturalOrder());
+    ClassTag<String> classTag = ClassTag$.MODULE$.apply(String.class);

Review comment:
       Not relevant anymore.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> implements Serializable {
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    JavaPairRDD<String, HoodieRecord<T>> pairRDD = records.mapToPair(record ->
+        new Tuple2(
+            new StringBuilder()
+                .append(record.getPartitionPath())
+                .append("+")
+                .append(record.getRecordKey())
+                .toString(), record));
+    Ordering<String> ordering = Ordering$.MODULE$

Review comment:
       I removed the class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yihua edited a comment on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua edited a comment on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-667345505


   @nsivabalan @n3nash  I just recall that Uber internally implements `UserDefinedBulkInsertPartitioner` interface to have custom partitioners so the name change of the `UserDefinedBulkInsertPartitioner` class is a breaking change.
   
   Shall we keep the naming `UserDefinedBulkInsertPartitioner`?  We can do this in a separate PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yihua commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-661421240


   @nsivabalan Thanks for the fix.  There is some ad-hoc code in this PR just for testing.  Let me clean that up.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yihua commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r463442771



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -161,6 +161,17 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths
     }
   }
 
+  public static List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {

Review comment:
       After some investigation, I found that tests in `TestCopyOnWriteActionExecutor` use a different record schema, causing the bulk insert to fail using records from `HoodieTestDataGenerator::generateInserts`.  For the bulk insert tests, I use the same schema provided by `HoodieTestDataGenerator` and `generateInserts()` for generating the testing records.  Now the tests work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362302100
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java
 ##########
 @@ -31,4 +31,6 @@
 public interface UserDefinedBulkInsertPartitioner<T extends HoodieRecordPayload> {
 
   JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions);
+
+  boolean arePartitionRecordsSorted();
 
 Review comment:
   Yeah, will do.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] n3nash commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-669295110


   @nsivabalan What is the reason to change the name from `UserDefinedBulkInsertPartitioner` to `BulkInsertPartitioner`. I see that you added a new method to the interface, which is fine, but want to understand the reason for the name change


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r463423301



##########
File path: hudi-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.newHoodieRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
+
+  public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc)
+      throws Exception {
+    // RDD partition 1
+    List<HoodieRecord> records1 = newHoodieRecords(3, "2020-07-31T03:16:41.415Z");
+    records1.addAll(newHoodieRecords(2, "2020-08-01T03:16:41.415Z"));
+    records1.addAll(newHoodieRecords(5, "2020-07-31T03:16:41.415Z"));
+    // RDD partition 2
+    List<HoodieRecord> records2 = newHoodieRecords(4, "2020-08-02T03:16:22.415Z");
+    records2.addAll(newHoodieRecords(1, "2020-07-31T03:16:41.415Z"));
+    records2.addAll(newHoodieRecords(5, "2020-08-01T06:16:41.415Z"));
+    return jsc.parallelize(records1, 1).union(jsc.parallelize(records2, 1));
+  }
+
+  public static Map<String, Long> generateExpectedPartitionNumRecords() {
+    Map<String, Long> expectedPartitionNumRecords = new HashMap<>();
+    expectedPartitionNumRecords.put("2020/07/31", 9L);
+    expectedPartitionNumRecords.put("2020/08/01", 7L);
+    expectedPartitionNumRecords.put("2020/08/02", 4L);
+    return expectedPartitionNumRecords;
+  }
+
+  private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc)
+      throws Exception {
+    return generateTestRecordsForBulkInsert(jsc).union(generateTestRecordsForBulkInsert(jsc))
+        .union(generateTestRecordsForBulkInsert(jsc));
+  }
+
+  public static Map<String, Long> generateExpectedPartitionNumRecordsTriple() {
+    Map<String, Long> expectedPartitionNumRecords = generateExpectedPartitionNumRecords();
+    for (String partitionPath : expectedPartitionNumRecords.keySet()) {
+      expectedPartitionNumRecords.put(partitionPath,
+          expectedPartitionNumRecords.get(partitionPath) * 3);
+    }
+    return expectedPartitionNumRecords;
+  }
+
+  private static Stream<Arguments> configParams() {
+    Object[][] data = new Object[][] {
+        {BulkInsertInternalPartitioner.BulkInsertSortMode.GLOBAL_SORT, true, true},
+        {BulkInsertInternalPartitioner.BulkInsertSortMode.PARTITION_SORT, false, true},
+        {BulkInsertInternalPartitioner.BulkInsertSortMode.NONE, false, false}
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private void verifyRecordAscendingOrder(Iterator<HoodieRecord> records) {
+    HoodieRecord prevRecord = null;
+
+    for (Iterator<HoodieRecord> it = records; it.hasNext(); ) {

Review comment:
       would n't it be simpler to compare two sorted lists for equivalency? instead of hand-crafting this logic of comparing for ascending order?

##########
File path: hudi-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.newHoodieRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
+
+  public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc)
+      throws Exception {
+    // RDD partition 1
+    List<HoodieRecord> records1 = newHoodieRecords(3, "2020-07-31T03:16:41.415Z");

Review comment:
       I think we can write this same test using `dataGen.generateInserts()`? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yihua commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-663276947


   I also simplified the naming of the sorting modes to: `NONE`, `GLOBAL_SORT`, and `PARTITION_SORT`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r461966620



##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Tuple2;
+
+/**
+ * A built-in partitioner that does local sorting for each RDD partition
+ * after coalesce for bulk insert operation, corresponding to the
+ * {@code BulkInsertSortMode.PARTITION_SORT} mode.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> {
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+                                                     int outputSparkPartitions) {
+    return records.coalesce(outputSparkPartitions)
+        .mapToPair(record ->
+            new Tuple2<>(
+                new StringBuilder()
+                    .append(record.getPartitionPath())
+                    .append("+")
+                    .append(record.getRecordKey())
+                    .toString(), record))
+        .mapPartitions(partition -> {
+          // Sort locally in partition
+          List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>();
+          for (; partition.hasNext(); ) {
+            recordList.add(partition.next());
+          }
+          Collections.sort(recordList, (o1, o2) -> o1._1.compareTo(o2._1));

Review comment:
       will sync up offline with you. interested to know difference between repartition and sort within partitions and current approach. Current approach will bring all records to memory right?

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitioner.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+/**
+ * Built-in partitioner to repartition input records into at least expected number of
+ * output spark partitions for bulk insert operation.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
+
+  public static BulkInsertInternalPartitioner get(BulkInsertSortMode sortMode) {

Review comment:
       this looks like a factory. did you consider naming with "factory" as suffix or something like that ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362277905
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 ##########
 @@ -77,6 +77,10 @@
   private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
   private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
   private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
+  private static final String BULKINSERT_SORT_ENABLED = "hoodie.bulkinsert.sort.enable";
 
 Review comment:
   instead of sorting vs non-sorting.. and having two configs.. Can we just have `hoodie.bulkinsert.write.mode` as the single config, which supports following values 
   
   - `NONE` (No sorting/shuffling) (default)
   - `GLOBALLY_SORTED` 
   - `PARTITION_SORTED`
   
   might be much easier to reason about for end user

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yihua commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-570019705
 
 
   @vinothchandar Thanks for the code review.  I'll iterate on the code based on the comments and add and fix the unit tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] cdmikechen commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
cdmikechen commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r361882256
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/RDDPartitionLocalSortPartitioner.java
 ##########
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+import scala.Tuple2;
+
+public class RDDPartitionLocalSortPartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> {
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    return records.mapToPair(record ->
+        new Tuple2<>(
+            String.format("%s+%s", record.getPartitionPath(), record.getRecordKey()), record))
+        .repartitionAndSortWithinPartitions(new HashPartitioner(outputSparkPartitions))
 
 Review comment:
   `HashPartitioner` means using record key's hashcode value to partition. Will this result in the same range of keys being assigned to different partitions? I think this may cause some problems in subsequent updates.
   Maybe `RangePartitioner` is better?
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] yihua commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r463417719



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -161,6 +161,17 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths
     }
   }
 
+  public static List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {

Review comment:
       Looks like the method is different from other methods in the generator.  I'll keep it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362302051
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/CopyOnWriteInsertHandler.java
 ##########
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.func.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
 
 Review comment:
   Will do.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362366288
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -367,20 +370,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
+      boolean isSorted, String commitTime, HoodieWriteConfig config, HoodieTable<T> hoodieTable,
+      List<String> fileIDPrefixes) {
+    if (isSorted) {
+      return new BulkInsertMapFunctionForSortedRecords(
+          commitTime, config, hoodieTable, fileIDPrefixes);
+    }
+    return new BulkInsertMapFunctionForNonSortedRecords(
+        commitTime, config, hoodieTable, fileIDPrefixes);
+  }
+
   private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime,
       HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
 
 Review comment:
   This was added by @ovj and uber/marmaray project passes in an implementation using the RDD api 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] yihua commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r458381933



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -245,6 +250,16 @@ public int getMaxConsistencyCheckIntervalMs() {
     return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
+  public BulkInsertSortMode getBulkInsertSortMode() {
+    String sortMode = props.getProperty(BULKINSERT_SORT_MODE);
+    try {
+      return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
+    } catch (IllegalArgumentException e) {

Review comment:
       Makes sense.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
+ */
+public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
+    extends

Review comment:
       Fixed.  It's due to my IDE's hard wrap at 100 chars.  

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
+ */
+public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
+    extends
+    BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
+
+  protected HoodieWriteConfig config;
+  protected String instantTime;
+  protected HoodieTable<T> hoodieTable;
+  protected String idPrefix;
+  protected int numFilesWritten;
+  protected SparkTaskContextSupplier sparkTaskContextSupplier;
+  protected WriteHandleFactory<T> writeHandleFactory;
+
+  protected final List<WriteStatus> statuses = new ArrayList<>();
+  protected Map<String, HoodieWriteHandle> handles = new HashMap<>();
+
+  public CopyOnWriteInsertHandler(
+      HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
+      SparkTaskContextSupplier sparkTaskContextSupplier, WriteHandleFactory<T> writeHandleFactory) {
+    this.config = config;
+    this.instantTime = instantTime;
+    this.hoodieTable = hoodieTable;
+    this.idPrefix = idPrefix;
+    this.numFilesWritten = 0;
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+    this.writeHandleFactory = writeHandleFactory;
+  }
+
+  @Override
+  public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
+    final HoodieRecord insertPayload = payload.record;
+    String partitionPath = insertPayload.getPartitionPath();
+    HoodieWriteHandle handle = handles.get(partitionPath);
+    // lazily initialize the handle, for the first time
+    if (handle == null) {
+      handle = writeHandleFactory.create(
+          config, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+          idPrefix, sparkTaskContextSupplier);
+      handles.put(partitionPath, handle);
+    }
+
+    if (handle.canWrite(payload.record)) {
+      // write the payload, if the handle has capacity
+      handle.write(insertPayload, payload.insertValue, payload.exception);
+    } else {
+      // handle is full.
+      statuses.add(handle.close());
+      // Need to handle the rejected payload & open new handle
+      handle = writeHandleFactory.create(
+          config, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+          idPrefix, sparkTaskContextSupplier);
+      handles.put(partitionPath, handle);
+      handle.write(insertPayload, payload.insertValue,
+          payload.exception); // we should be able to write 1 payload.
+    }
+  }
+
+  @Override
+  public void finish() {
+    for (HoodieWriteHandle handle : handles.values()) {

Review comment:
       So the `HoodieWriteHandle` instances stored in the `handles` mapping are all open.  If you look at the L84-92, after one handle is closed, it is added to the `statuses` and removed from the mapping by replacing it with the newly opened handle.
   
   And I assume that we should return all `WriteStatus` instances from all handles in this case, whether it is closed in the middle or not.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunctionForNonSortedRecords.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.CopyOnWriteInsertHandler;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class BulkInsertMapFunctionForNonSortedRecords<T extends HoodieRecordPayload>
+    extends BulkInsertMapFunction<T> {
+
+  Map<String, CopyOnWriteInsertHandler> parallelWritersMap;

Review comment:
       Fixed.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> implements Serializable {
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    JavaPairRDD<String, HoodieRecord<T>> pairRDD = records.mapToPair(record ->
+        new Tuple2(
+            new StringBuilder()
+                .append(record.getPartitionPath())
+                .append("+")
+                .append(record.getRecordKey())
+                .toString(), record));
+    Ordering<String> ordering = Ordering$.MODULE$

Review comment:
       `RangePartitioner` is a scala class so if we have to use it, we have to import it.  In this case, shall we avoid this mode of sorting?

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
##########
@@ -56,31 +59,50 @@
 
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
     final int parallelism = config.getBulkInsertShuffleParallelism();
+    boolean arePartitionRecordsSorted = true;
     if (bulkInsertPartitioner.isPresent()) {
-      repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
+      repartitionedRecords = bulkInsertPartitioner.get()
+          .repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = bulkInsertPartitioner.get().arePartitionRecordsSorted();
     } else {
-      // Now, sort the records and line them up nicely for loading.
-      repartitionedRecords = dedupedRecords.sortBy(record -> {
-        // Let's use "partitionPath + key" as the sort key. Spark, will ensure
-        // the records split evenly across RDD partitions, such that small partitions fit
-        // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
-        return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
-      }, true, parallelism);
+      BulkInsertInternalPartitioner partitioner =
+          BulkInsertInternalPartitioner.get(config.getBulkInsertSortMode());
+      repartitionedRecords = partitioner.repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted();

Review comment:
       Right, simplified the logic to make it clear.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> implements Serializable {
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    JavaPairRDD<String, HoodieRecord<T>> pairRDD = records.mapToPair(record ->
+        new Tuple2(
+            new StringBuilder()
+                .append(record.getPartitionPath())
+                .append("+")
+                .append(record.getRecordKey())
+                .toString(), record));
+    Ordering<String> ordering = Ordering$.MODULE$
+        .comparatorToOrdering(Comparator.<String>naturalOrder());
+    ClassTag<String> classTag = ClassTag$.MODULE$.apply(String.class);
+    return pairRDD.partitionBy(new RangePartitioner<String, HoodieRecord<T>>(

Review comment:
       This logic sorts the RDD partition ranges so that partition ranges are mutually exclusive, but inside each RDD the data is not sorted.  Do you think we need this?  If not, I can remove this class (also getting rid of the scala imports).

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>

Review comment:
       Yes.  I'll take a pass for all the classes added.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yihua commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r462703978



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -161,6 +161,17 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths
     }
   }
 
+  public static List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {

Review comment:
       I copied this method from the `TestCopyOnWriteActionExecutor` class.  Let me check if I can generalize this.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitioner.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+/**
+ * Built-in partitioner to repartition input records into at least expected number of
+ * output spark partitions for bulk insert operation.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
+
+  public static BulkInsertInternalPartitioner get(BulkInsertSortMode sortMode) {

Review comment:
       Yes, good suggestion.  Let me do that.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import scala.Tuple2;
+
+/**
+ * A built-in partitioner that does local sorting for each RDD partition
+ * after coalesce for bulk insert operation, corresponding to the
+ * {@code BulkInsertSortMode.PARTITION_SORT} mode.
+ *
+ * @param <T> HoodieRecordPayload type
+ */
+public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> {
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+                                                     int outputSparkPartitions) {
+    return records.coalesce(outputSparkPartitions)
+        .mapToPair(record ->
+            new Tuple2<>(
+                new StringBuilder()
+                    .append(record.getPartitionPath())
+                    .append("+")
+                    .append(record.getRecordKey())
+                    .toString(), record))
+        .mapPartitions(partition -> {
+          // Sort locally in partition
+          List<Tuple2<String, HoodieRecord<T>>> recordList = new ArrayList<>();
+          for (; partition.hasNext(); ) {
+            recordList.add(partition.next());
+          }
+          Collections.sort(recordList, (o1, o2) -> o1._1.compareTo(o2._1));

Review comment:
       Yes, let's sync offline.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r457859023



##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> implements Serializable {
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    JavaPairRDD<String, HoodieRecord<T>> pairRDD = records.mapToPair(record ->
+        new Tuple2(
+            new StringBuilder()
+                .append(record.getPartitionPath())
+                .append("+")
+                .append(record.getRecordKey())
+                .toString(), record));
+    Ordering<String> ordering = Ordering$.MODULE$

Review comment:
       can we write this without importing scala classes here? it will be problematic in terms of bundling..  

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
+ */
+public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
+    extends
+    BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
+
+  protected HoodieWriteConfig config;
+  protected String instantTime;
+  protected HoodieTable<T> hoodieTable;
+  protected String idPrefix;
+  protected int numFilesWritten;
+  protected SparkTaskContextSupplier sparkTaskContextSupplier;
+  protected WriteHandleFactory<T> writeHandleFactory;
+
+  protected final List<WriteStatus> statuses = new ArrayList<>();
+  protected Map<String, HoodieWriteHandle> handles = new HashMap<>();
+
+  public CopyOnWriteInsertHandler(
+      HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
+      SparkTaskContextSupplier sparkTaskContextSupplier, WriteHandleFactory<T> writeHandleFactory) {
+    this.config = config;
+    this.instantTime = instantTime;
+    this.hoodieTable = hoodieTable;
+    this.idPrefix = idPrefix;
+    this.numFilesWritten = 0;
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+    this.writeHandleFactory = writeHandleFactory;
+  }
+
+  @Override
+  public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
+    final HoodieRecord insertPayload = payload.record;
+    String partitionPath = insertPayload.getPartitionPath();
+    HoodieWriteHandle handle = handles.get(partitionPath);
+    // lazily initialize the handle, for the first time
+    if (handle == null) {
+      handle = writeHandleFactory.create(
+          config, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+          idPrefix, sparkTaskContextSupplier);
+      handles.put(partitionPath, handle);
+    }
+
+    if (handle.canWrite(payload.record)) {
+      // write the payload, if the handle has capacity
+      handle.write(insertPayload, payload.insertValue, payload.exception);
+    } else {
+      // handle is full.
+      statuses.add(handle.close());
+      // Need to handle the rejected payload & open new handle
+      handle = writeHandleFactory.create(
+          config, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+          idPrefix, sparkTaskContextSupplier);
+      handles.put(partitionPath, handle);
+      handle.write(insertPayload, payload.insertValue,
+          payload.exception); // we should be able to write 1 payload.
+    }
+  }
+
+  @Override
+  public void finish() {
+    for (HoodieWriteHandle handle : handles.values()) {

Review comment:
       so this implies that `handle.close()` needs to be idempotent? iff we are closing the handle in L85 already, why close everything from the map again? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertMapFunctionForNonSortedRecords.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.CopyOnWriteInsertHandler;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.CreateHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.Schema;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class BulkInsertMapFunctionForNonSortedRecords<T extends HoodieRecordPayload>
+    extends BulkInsertMapFunction<T> {
+
+  Map<String, CopyOnWriteInsertHandler> parallelWritersMap;

Review comment:
       make this private? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
##########
@@ -56,31 +59,50 @@
 
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
     final int parallelism = config.getBulkInsertShuffleParallelism();
+    boolean arePartitionRecordsSorted = true;
     if (bulkInsertPartitioner.isPresent()) {
-      repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
+      repartitionedRecords = bulkInsertPartitioner.get()
+          .repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = bulkInsertPartitioner.get().arePartitionRecordsSorted();
     } else {
-      // Now, sort the records and line them up nicely for loading.
-      repartitionedRecords = dedupedRecords.sortBy(record -> {
-        // Let's use "partitionPath + key" as the sort key. Spark, will ensure
-        // the records split evenly across RDD partitions, such that small partitions fit
-        // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
-        return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
-      }, true, parallelism);
+      BulkInsertInternalPartitioner partitioner =
+          BulkInsertInternalPartitioner.get(config.getBulkInsertSortMode());
+      repartitionedRecords = partitioner.repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted();

Review comment:
       this assignment can be done just once outside?  (intellij tips also indicated that IIRC)

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
##########
@@ -56,31 +59,50 @@
 
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
     final int parallelism = config.getBulkInsertShuffleParallelism();
+    boolean arePartitionRecordsSorted = true;
     if (bulkInsertPartitioner.isPresent()) {
-      repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
+      repartitionedRecords = bulkInsertPartitioner.get()

Review comment:
       just like here.. lets bring 64-65 lines to a single line? 

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java
##########
@@ -31,4 +31,6 @@
 public interface UserDefinedBulkInsertPartitioner<T extends HoodieRecordPayload> {
 
   JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions);
+
+  boolean arePartitionRecordsSorted();

Review comment:
       javadoc please 

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> implements Serializable {
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    JavaPairRDD<String, HoodieRecord<T>> pairRDD = records.mapToPair(record ->
+        new Tuple2(
+            new StringBuilder()
+                .append(record.getPartitionPath())
+                .append("+")
+                .append(record.getRecordKey())
+                .toString(), record));
+    Ordering<String> ordering = Ordering$.MODULE$
+        .comparatorToOrdering(Comparator.<String>naturalOrder());
+    ClassTag<String> classTag = ClassTag$.MODULE$.apply(String.class);
+    return pairRDD.partitionBy(new RangePartitioner<String, HoodieRecord<T>>(

Review comment:
       is nt this exactly what `sortBy` will do? are we somehow picking a better range? I'd like to skip this implementation otherwise.

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> implements Serializable {
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    JavaPairRDD<String, HoodieRecord<T>> pairRDD = records.mapToPair(record ->
+        new Tuple2(
+            new StringBuilder()
+                .append(record.getPartitionPath())
+                .append("+")
+                .append(record.getRecordKey())
+                .toString(), record));
+    Ordering<String> ordering = Ordering$.MODULE$
+        .comparatorToOrdering(Comparator.<String>naturalOrder());
+    ClassTag<String> classTag = ClassTag$.MODULE$.apply(String.class);

Review comment:
       lets implement these using just java class objects ?

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionRangePartitioner.java
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+
+import org.apache.spark.RangePartitioner;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import scala.Tuple2;
+import scala.math.Ordering;
+import scala.math.Ordering$;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
+public class RDDPartitionRangePartitioner<T extends HoodieRecordPayload>

Review comment:
       also please add javadocs for all these classes.. even if its a single line, it makes the code so much more readable in the long run

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
##########
@@ -56,31 +59,50 @@
 
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
     final int parallelism = config.getBulkInsertShuffleParallelism();
+    boolean arePartitionRecordsSorted = true;
     if (bulkInsertPartitioner.isPresent()) {
-      repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
+      repartitionedRecords = bulkInsertPartitioner.get()
+          .repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = bulkInsertPartitioner.get().arePartitionRecordsSorted();
     } else {
-      // Now, sort the records and line them up nicely for loading.
-      repartitionedRecords = dedupedRecords.sortBy(record -> {
-        // Let's use "partitionPath + key" as the sort key. Spark, will ensure
-        // the records split evenly across RDD partitions, such that small partitions fit
-        // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
-        return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
-      }, true, parallelism);
+      BulkInsertInternalPartitioner partitioner =
+          BulkInsertInternalPartitioner.get(config.getBulkInsertSortMode());
+      repartitionedRecords = partitioner.repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted();
     }
 
     // generate new file ID prefixes for each output partition
     final List<String> fileIDPrefixes =
-        IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
+        IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx())

Review comment:
       same thing here.  our large monitors to the rescue :) lets make 76-77 into a single line if poss

##########
File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java
##########
@@ -56,31 +59,50 @@
 
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
     final int parallelism = config.getBulkInsertShuffleParallelism();
+    boolean arePartitionRecordsSorted = true;
     if (bulkInsertPartitioner.isPresent()) {
-      repartitionedRecords = bulkInsertPartitioner.get().repartitionRecords(dedupedRecords, parallelism);
+      repartitionedRecords = bulkInsertPartitioner.get()
+          .repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = bulkInsertPartitioner.get().arePartitionRecordsSorted();
     } else {
-      // Now, sort the records and line them up nicely for loading.
-      repartitionedRecords = dedupedRecords.sortBy(record -> {
-        // Let's use "partitionPath + key" as the sort key. Spark, will ensure
-        // the records split evenly across RDD partitions, such that small partitions fit
-        // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
-        return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
-      }, true, parallelism);
+      BulkInsertInternalPartitioner partitioner =
+          BulkInsertInternalPartitioner.get(config.getBulkInsertSortMode());
+      repartitionedRecords = partitioner.repartitionRecords(dedupedRecords, parallelism);
+      arePartitionRecordsSorted = partitioner.arePartitionRecordsSorted();
     }
 
     // generate new file ID prefixes for each output partition
     final List<String> fileIDPrefixes =
-        IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
+        IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx())
+            .collect(Collectors.toList());
 
     table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
         table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
         config.shouldAllowMultiWriteOnSameInstant());
 
+    /*

Review comment:
       remove? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-576387868
 
 
   @umehrot2 fyi 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] nsivabalan commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-668327303


   @n3nash ^ 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362994623
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/BulkInsertMapFunctionForNonSortedRecords.java
 ##########
 @@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.Schema;
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.func.CopyOnWriteInsertHandler;
+import org.apache.hudi.func.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.table.HoodieTable;
+
+public class BulkInsertMapFunctionForNonSortedRecords<T extends HoodieRecordPayload>
 
 Review comment:
   So in this case, we might end up with a skewed Spark Task which ends up writing many files due to large number of records being sent to it (same partition path) ?
   @vinothchandar 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362295790
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/BulkInsertInternalPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
 
 Review comment:
   We can rename UserDefinedBulkInsertPartitioner to just `BulkInsertPartitioner` and reuse it I think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] nsivabalan commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-669298777


   We have introduced new sort modes as part of this PR and respective bulk insert partitioners. All these were using UserDefinedBulkInsertPartitioner interface, but then these newly added ones are not user defined but thats the only interface we had in place. So it made sense to rename this interface as BulkInsertPartitioner as it is being used by newly added readily available partitioners. But I didn't think too much about existing users. So, open to adding a new empty interface called UserDefinedBulkInsertPartitioner that extends from BulkInsertPartitioner. 
   But my point was, even if we do that, we can't avoid users needing to make changes to accommodate the new method. So, my point was why not users implement BulkInsertPartitioner instead of UserDefinedBulkInsertPartitioner incase some changes are required from their end. (BulkInsertPartitioner interface is exactly same as old UserDefinedBulkInsertPartitioner with just an addition of one method arePartitionRecordsSorted() ) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] cdmikechen edited a comment on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
cdmikechen edited a comment on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-569556382
 
 
   I also pay more attention to this issue. I think bulk should be an efficient way to import data first time, just like `jdbc batch insert`. 
   Can we show some improved data size, data partition size, bulk cost time, sorting cost time and other information using histogram or line chart? In this way, the applicability of each modified method can be better explained.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] vinothchandar commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r463422364



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -161,6 +161,17 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths
     }
   }
 
+  public static List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {

Review comment:
       but i actually find it surprising we needed to add one. This class as-is is a medley of different individual methods. I would really like to avoid overloading this more. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362301880
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 ##########
 @@ -77,6 +77,10 @@
   private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
   private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
   private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
+  private static final String BULKINSERT_SORT_ENABLED = "hoodie.bulkinsert.sort.enable";
 
 Review comment:
   Yes, one config is better than two for the user to reason.  I'll make the change accordingly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r376236746
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -381,20 +384,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
 
 Review comment:
   I was looking at this for a different JIRA. but having a class for BulkInsertMapfunction seems like an overkill? Can we just replace with lambdas and kill the class ?
   
   eg, in the code on master, just change to below
   
   ```
    JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
           .mapPartitionsWithIndex((partition, recordItr) ->
               new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, table, fileIDPrefixes.get(partition)), true)
           .flatMap(List::iterator);
   
   ```
   
   if you agree, just do it in the PR.. that way I wont introduce an unnecessary rebase for you :) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] yihua commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-667345505


   @nsivabalan @n3nash  I just recall that Uber internally extends `UserDefinedBulkInsertPartitioner` to have custom partitioners so the name change of the `UserDefinedBulkInsertPartitioner` class is a breaking change.
   
   Shall we keep the naming `UserDefinedBulkInsertPartitioner`?  We can do this in a separate PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yihua commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r463441657



##########
File path: hudi-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.newHoodieRecords;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase {
+
+  public static JavaRDD<HoodieRecord> generateTestRecordsForBulkInsert(JavaSparkContext jsc)
+      throws Exception {
+    // RDD partition 1
+    List<HoodieRecord> records1 = newHoodieRecords(3, "2020-07-31T03:16:41.415Z");
+    records1.addAll(newHoodieRecords(2, "2020-08-01T03:16:41.415Z"));
+    records1.addAll(newHoodieRecords(5, "2020-07-31T03:16:41.415Z"));
+    // RDD partition 2
+    List<HoodieRecord> records2 = newHoodieRecords(4, "2020-08-02T03:16:22.415Z");
+    records2.addAll(newHoodieRecords(1, "2020-07-31T03:16:41.415Z"));
+    records2.addAll(newHoodieRecords(5, "2020-08-01T06:16:41.415Z"));
+    return jsc.parallelize(records1, 1).union(jsc.parallelize(records2, 1));
+  }
+
+  public static Map<String, Long> generateExpectedPartitionNumRecords() {
+    Map<String, Long> expectedPartitionNumRecords = new HashMap<>();
+    expectedPartitionNumRecords.put("2020/07/31", 9L);
+    expectedPartitionNumRecords.put("2020/08/01", 7L);
+    expectedPartitionNumRecords.put("2020/08/02", 4L);
+    return expectedPartitionNumRecords;
+  }
+
+  private static JavaRDD<HoodieRecord> generateTripleTestRecordsForBulkInsert(JavaSparkContext jsc)
+      throws Exception {
+    return generateTestRecordsForBulkInsert(jsc).union(generateTestRecordsForBulkInsert(jsc))
+        .union(generateTestRecordsForBulkInsert(jsc));
+  }
+
+  public static Map<String, Long> generateExpectedPartitionNumRecordsTriple() {
+    Map<String, Long> expectedPartitionNumRecords = generateExpectedPartitionNumRecords();
+    for (String partitionPath : expectedPartitionNumRecords.keySet()) {
+      expectedPartitionNumRecords.put(partitionPath,
+          expectedPartitionNumRecords.get(partitionPath) * 3);
+    }
+    return expectedPartitionNumRecords;
+  }
+
+  private static Stream<Arguments> configParams() {
+    Object[][] data = new Object[][] {
+        {BulkInsertInternalPartitioner.BulkInsertSortMode.GLOBAL_SORT, true, true},
+        {BulkInsertInternalPartitioner.BulkInsertSortMode.PARTITION_SORT, false, true},
+        {BulkInsertInternalPartitioner.BulkInsertSortMode.NONE, false, false}
+    };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private void verifyRecordAscendingOrder(Iterator<HoodieRecord> records) {
+    HoodieRecord prevRecord = null;
+
+    for (Iterator<HoodieRecord> it = records; it.hasNext(); ) {

Review comment:
       Yes, make sense.  Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r463303157



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -161,6 +161,17 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths
     }
   }
 
+  public static List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {

Review comment:
       I see TestRawTripPayload(String jsonData) is being used at many places. Guess we can just move this method to HoodieTestDataGenerator. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] cdmikechen edited a comment on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
cdmikechen edited a comment on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-569556382
 
 
   I also pay more attention to this issue. I think bulk should be an efficient way to import data, just like
   jdbc batch insert.
   Can we show some improved data size, data partition size, bulk cost time, sorting cost time and other information using histogram or line chart? In this way, the applicability of each modified method can be better explained.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] nsivabalan merged pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan merged pull request #1149:
URL: https://github.com/apache/hudi/pull/1149


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-652734921


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=h1) Report
   > Merging [#1149](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=desc) into [master](https://codecov.io/gh/apache/hudi/commit/b71f25f210c4004a2dcc97a9967399e74f870fc7&el=desc) will **increase** coverage by `9.65%`.
   > The diff coverage is `52.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/1149/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1149      +/-   ##
   ============================================
   + Coverage     46.45%   56.10%   +9.65%     
   - Complexity     2811     3498     +687     
   ============================================
     Files           442      454      +12     
     Lines         19131    19373     +242     
     Branches       1931     1942      +11     
   ============================================
   + Hits           8888    10870    +1982     
   + Misses         9483     7771    -1712     
   + Partials        760      732      -28     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | #hudicli | `68.65% <52.50%> (+38.81%)` | `1489.00 <20.00> (+919.00)` | |
   | #hudiclient | `79.01% <52.50%> (+52.40%)` | `1317.00 <20.00> (+919.00)` | |
   | #hudicommon | `54.81% <ø> (+0.02%)` | `1508.00 <ø> (-1.00)` | :arrow_up: |
   | #hudihadoopmr | `38.99% <ø> (ø)` | `163.00 <ø> (ø)` | |
   | #hudihivesync | `72.25% <ø> (ø)` | `121.00 <ø> (ø)` | |
   | #hudispark | `49.24% <ø> (ø)` | `122.00 <ø> (ø)` | |
   | #huditimelineservice | `63.47% <ø> (ø)` | `47.00 <ø> (ø)` | |
   | #hudiutilities | `12.07% <ø> (-62.58%)` | `48.00 <ø> (-231.00)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...sert/BulkInsertMapFunctionForNonSortedRecords.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uRm9yTm9uU29ydGVkUmVjb3Jkcy5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [.../hudi/execution/bulkinsert/NonSortPartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvTm9uU29ydFBhcnRpdGlvbmVyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...n/bulkinsert/RDDPartitionLocalSortPartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uTG9jYWxTb3J0UGFydGl0aW9uZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...ution/bulkinsert/RDDPartitionRangePartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uUmFuZ2VQYXJ0aXRpb25lci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...tion/bulkinsert/BulkInsertInternalPartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydEludGVybmFsUGFydGl0aW9uZXIuamF2YQ==) | `53.84% <53.84%> (ø)` | `2.00 <2.00> (?)` | |
   | [...che/hudi/table/action/commit/BulkInsertHelper.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NvbW1pdC9CdWxrSW5zZXJ0SGVscGVyLmphdmE=) | `74.19% <60.00%> (+74.19%)` | `3.00 <1.00> (+3.00)` | |
   | [...java/org/apache/hudi/config/HoodieWriteConfig.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29uZmlnL0hvb2RpZVdyaXRlQ29uZmlnLmphdmE=) | `79.32% <83.33%> (+35.84%)` | `90.00 <1.00> (+33.00)` | |
   | [...pache/hudi/execution/CopyOnWriteInsertHandler.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0NvcHlPbldyaXRlSW5zZXJ0SGFuZGxlci5qYXZh) | `94.11% <94.11%> (ø)` | `8.00 <8.00> (?)` | |
   | [.../org/apache/hudi/execution/LazyInsertIterable.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0xhenlJbnNlcnRJdGVyYWJsZS5qYXZh) | `79.41% <100.00%> (+6.86%)` | `8.00 <1.00> (ø)` | |
   | [...di/execution/bulkinsert/BulkInsertMapFunction.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uLmphdmE=) | `75.00% <100.00%> (ø)` | `1.00 <1.00> (?)` | |
   | ... and [157 more](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yihua commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on issue #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-569985331
 
 
   @cdmikechen Thanks for the suggestion.  I'm actually working on benchmarking each mode with different types of workload.  Once I have some data points I'll share them.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] n3nash commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-669315726


   @nsivabalan thanks for the context. I see what you're saying. It's most likely Uber using this special partitioner. I see that a change is required in either scenario, changing the name is fine and keeps it clear without the need to add a placeholder abstract class. We will absorb the backwards incompatible change on our end.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-667721158


   I thought of adding an empty interface called UserDefinedBulkInsertPartitioner, but then BulkInsertPartitioner itself has a new method  boolean arePartitionRecordsSorted(); So, anyways, existing users will have to make code changes to give implementation to this method. So, not sure if there will be benefit adding an empty UserDefinedBulkInsertPartitioner interface extending from BulkInsertPartitioner. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362994362
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/BulkInsertInternalPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
 
 Review comment:
   Can we just keep the UserDefinedBulkInsertPartitioner as an interface and then provide an abstract class here ? Might be fine to change at Uber, but it might be best to avoid a breaking change since it's simple in this case ?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r376236783
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -381,20 +384,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
 
 Review comment:
   cc @yihua  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] vinothchandar commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-662584698


   >Do you think we need this? If not, I can remove this class (also getting rid of the scala imports).
   
   @yihua yes lets remove this for now. globally_sorted, local_sorted, no_sort should be good enough for now 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash commented on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-643387369


   @yihua Is this still failing in the MergeHandle ? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362295839
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/BulkInsertInternalPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.table.UserDefinedBulkInsertPartitioner;
+
+public abstract class BulkInsertInternalPartitioner<T extends HoodieRecordPayload> implements
+    UserDefinedBulkInsertPartitioner<T> {
 
 Review comment:
   RDD API clients i.e Uber/marmaray  may need to change the name of the interface implemented. It should be fine IMO. cc @bvaradar @n3nash 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] vinothchandar commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-665155548


   >For the default sort mode for bulk insert, shall we set it to a mode other than GLOBAL_SORT?
   
   yes. sg. we can retain existing behavior.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362993888
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -367,20 +370,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
+      boolean isSorted, String commitTime, HoodieWriteConfig config, HoodieTable<T> hoodieTable,
+      List<String> fileIDPrefixes) {
+    if (isSorted) {
+      return new BulkInsertMapFunctionForSortedRecords(
+          commitTime, config, hoodieTable, fileIDPrefixes);
+    }
+    return new BulkInsertMapFunctionForNonSortedRecords(
+        commitTime, config, hoodieTable, fileIDPrefixes);
+  }
+
   private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime,
       HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
 
 Review comment:
   @yihua Yes, we do need to keep this functionality, at Uber as @vinothchandar pointed out, uses this. We directly pass an implementation rather than a config.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362295734
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/CopyOnWriteInsertHandler.java
 ##########
 @@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.func.CopyOnWriteLazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.HoodieCreateHandle;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.table.HoodieTable;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
 
 Review comment:
   Improve the javadocs? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] vinothchandar commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r457839262



##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -245,6 +250,16 @@ public int getMaxConsistencyCheckIntervalMs() {
     return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
+  public BulkInsertSortMode getBulkInsertSortMode() {
+    String sortMode = props.getProperty(BULKINSERT_SORT_MODE);
+    try {
+      return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
+    } catch (IllegalArgumentException e) {

Review comment:
       given IllegalArgumentException is itself runtime exception.. may be ok to just let that percolate. 

##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
+ */
+public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
+    extends

Review comment:
       lets fix the alignment. may be here 

##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -632,6 +647,10 @@ public FileSystemViewStorageConfig getClientSpecifiedViewStorageConfig() {
     return clientSpecifiedViewStorageConfig;
   }
 
+  public boolean getStringFormation() {
+    return Boolean.parseBoolean(props.getProperty("hoodie.tmp.string.format"));

Review comment:
       whats this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] codecov-commenter edited a comment on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-652734921


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=h1) Report
   > Merging [#1149](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=desc) into [master](https://codecov.io/gh/apache/hudi/commit/2be924fd3a04e2106f1ad3f9ce91890ea58c8a88&el=desc) will **increase** coverage by `0.27%`.
   > The diff coverage is `50.38%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/1149/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1149      +/-   ##
   ============================================
   + Coverage     60.38%   60.66%   +0.27%     
   - Complexity     3605     3686      +81     
   ============================================
     Files           436      450      +14     
     Lines         18949    19236     +287     
     Branches       1914     1933      +19     
   ============================================
   + Hits          11442    11669     +227     
   - Misses         6719     6768      +49     
   - Partials        788      799      +11     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | #hudicli | `68.06% <50.38%> (-0.29%)` | `1445.00 <21.00> (+19.00)` | :arrow_down: |
   | #hudiclient | `78.51% <50.38%> (-0.66%)` | `1273.00 <21.00> (+19.00)` | :arrow_down: |
   | #hudicommon | `54.78% <ø> (+0.50%)` | `1509.00 <ø> (+24.00)` | |
   | #hudihadoopmr | `38.99% <ø> (-0.37%)` | `163.00 <ø> (ø)` | |
   | #hudihivesync | `72.25% <ø> (ø)` | `121.00 <ø> (ø)` | |
   | #hudispark | `49.24% <ø> (+5.01%)` | `122.00 <ø> (+46.00)` | |
   | #huditimelineservice | `63.47% <ø> (ø)` | `47.00 <ø> (ø)` | |
   | #hudiutilities | `74.64% <ø> (+0.89%)` | `279.00 <ø> (-8.00)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...sert/BulkInsertMapFunctionForNonSortedRecords.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uRm9yTm9uU29ydGVkUmVjb3Jkcy5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [.../hudi/execution/bulkinsert/NonSortPartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvTm9uU29ydFBhcnRpdGlvbmVyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...n/bulkinsert/RDDPartitionLocalSortPartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uTG9jYWxTb3J0UGFydGl0aW9uZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...ution/bulkinsert/RDDPartitionRangePartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uUmFuZ2VQYXJ0aXRpb25lci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...tion/bulkinsert/BulkInsertInternalPartitioner.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydEludGVybmFsUGFydGl0aW9uZXIuamF2YQ==) | `53.84% <53.84%> (ø)` | `2.00 <2.00> (?)` | |
   | [...java/org/apache/hudi/config/HoodieWriteConfig.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29uZmlnL0hvb2RpZVdyaXRlQ29uZmlnLmphdmE=) | `79.08% <60.00%> (-0.76%)` | `90.00 <2.00> (+2.00)` | :arrow_down: |
   | [...che/hudi/table/action/commit/BulkInsertHelper.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NvbW1pdC9CdWxrSW5zZXJ0SGVscGVyLmphdmE=) | `75.00% <62.50%> (-10.72%)` | `3.00 <1.00> (-1.00)` | |
   | [...pache/hudi/execution/CopyOnWriteInsertHandler.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0NvcHlPbldyaXRlSW5zZXJ0SGFuZGxlci5qYXZh) | `94.11% <94.11%> (ø)` | `8.00 <8.00> (?)` | |
   | [.../org/apache/hudi/execution/LazyInsertIterable.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0xhenlJbnNlcnRJdGVyYWJsZS5qYXZh) | `79.41% <100.00%> (-0.99%)` | `8.00 <1.00> (ø)` | |
   | [...di/execution/bulkinsert/BulkInsertMapFunction.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uLmphdmE=) | `75.00% <100.00%> (ø)` | `1.00 <1.00> (?)` | |
   | ... and [54 more](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362275524
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/RDDPartitionLocalSortPartitioner.java
 ##########
 @@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.spark.HashPartitioner;
+import org.apache.spark.api.java.JavaRDD;
+import scala.Tuple2;
+
+public class RDDPartitionLocalSortPartitioner<T extends HoodieRecordPayload>
+    extends BulkInsertInternalPartitioner<T> {
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
+      int outputSparkPartitions) {
+    return records.mapToPair(record ->
+        new Tuple2<>(
+            String.format("%s+%s", record.getPartitionPath(), record.getRecordKey()), record))
+        .repartitionAndSortWithinPartitions(new HashPartitioner(outputSparkPartitions))
 
 Review comment:
   Yes, `RangePartitioner` is actually what I'm trying to look for.
   
   For this specific Partitioner, it tries to avoid shuffling to speed up the bulk insert but may introduce overlapping ranges.  will check the side effects of this.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362295936
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/NonSortPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.spark.api.java.JavaRDD;
+
+public class NonSortPartitioner<T extends HoodieRecordPayload>
 
 Review comment:
   More like `NonShufflingPartitioner`? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] nsivabalan commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-669299989


   I also thought we could add an abstract class UserDefinedBulkInsertPartitioner and make arePartitionRecordsSorted() false by default, but this also requires users to change from "implements" to "extends" ;) 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r458958468



##########
File path: hudi-client/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.execution.LazyInsertIterable.HoodieInsertValueGenResult;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.WriteHandleFactory;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Consumes stream of hoodie records from in-memory queue and writes to one or more create-handles.
+ */
+public class CopyOnWriteInsertHandler<T extends HoodieRecordPayload>
+    extends
+    BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> {
+
+  protected HoodieWriteConfig config;
+  protected String instantTime;
+  protected HoodieTable<T> hoodieTable;
+  protected String idPrefix;
+  protected int numFilesWritten;
+  protected SparkTaskContextSupplier sparkTaskContextSupplier;
+  protected WriteHandleFactory<T> writeHandleFactory;
+
+  protected final List<WriteStatus> statuses = new ArrayList<>();
+  protected Map<String, HoodieWriteHandle> handles = new HashMap<>();
+
+  public CopyOnWriteInsertHandler(
+      HoodieWriteConfig config, String instantTime, HoodieTable<T> hoodieTable, String idPrefix,
+      SparkTaskContextSupplier sparkTaskContextSupplier, WriteHandleFactory<T> writeHandleFactory) {
+    this.config = config;
+    this.instantTime = instantTime;
+    this.hoodieTable = hoodieTable;
+    this.idPrefix = idPrefix;
+    this.numFilesWritten = 0;
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+    this.writeHandleFactory = writeHandleFactory;
+  }
+
+  @Override
+  public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
+    final HoodieRecord insertPayload = payload.record;
+    String partitionPath = insertPayload.getPartitionPath();
+    HoodieWriteHandle handle = handles.get(partitionPath);
+    // lazily initialize the handle, for the first time
+    if (handle == null) {
+      handle = writeHandleFactory.create(
+          config, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+          idPrefix, sparkTaskContextSupplier);
+      handles.put(partitionPath, handle);
+    }
+
+    if (handle.canWrite(payload.record)) {
+      // write the payload, if the handle has capacity
+      handle.write(insertPayload, payload.insertValue, payload.exception);
+    } else {
+      // handle is full.
+      statuses.add(handle.close());
+      // Need to handle the rejected payload & open new handle
+      handle = writeHandleFactory.create(
+          config, instantTime, hoodieTable, insertPayload.getPartitionPath(),
+          idPrefix, sparkTaskContextSupplier);
+      handles.put(partitionPath, handle);
+      handle.write(insertPayload, payload.insertValue,
+          payload.exception); // we should be able to write 1 payload.
+    }
+  }
+
+  @Override
+  public void finish() {
+    for (HoodieWriteHandle handle : handles.values()) {

Review comment:
       okay makes sense. the  key thing seems to be that the handle replaced in the `handles` map, is just closed at the end. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan edited a comment on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan edited a comment on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-660731926


   @yihua : I have fixed the flaky test. I assume you have resolved all feedback provided. If not, do let me know. As of now, I am assuming the patch is ready to review again. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r386218656
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
 ##########
 @@ -381,20 +384,30 @@ public static SparkConf registerClasses(SparkConf conf) {
     }
   }
 
+  private BulkInsertMapFunction<T> getBulkInsertMapFunction(
 
 Review comment:
   Lets do whats easier and gets us moving faster .. :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] codecov-commenter commented on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-652734921


   # [Codecov](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=h1) Report
   > Merging [#1149](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=desc) into [master](https://codecov.io/gh/apache/hudi/commit/2be924fd3a04e2106f1ad3f9ce91890ea58c8a88&el=desc) will **decrease** coverage by `7.05%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/hudi/pull/1149/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1149      +/-   ##
   ============================================
   - Coverage     60.38%   53.32%   -7.06%     
   + Complexity     3605     2351    -1254     
   ============================================
     Files           436      302     -134     
     Lines         18949    13775    -5174     
     Branches       1914     1450     -464     
   ============================================
   - Hits          11442     7346    -4096     
   + Misses         6719     5866     -853     
   + Partials        788      563     -225     
   ```
   
   | Flag | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | #hudicli | `38.71% <ø> (-29.64%)` | `172.00 <ø> (-1254.00)` | |
   | #hudiclient | `?` | `?` | |
   | #hudicommon | `54.28% <ø> (ø)` | `1485.00 <ø> (ø)` | |
   | #hudihadoopmr | `39.36% <ø> (ø)` | `163.00 <ø> (ø)` | |
   | #hudihivesync | `72.25% <ø> (ø)` | `121.00 <ø> (ø)` | |
   | #hudispark | `44.22% <ø> (ø)` | `76.00 <ø> (ø)` | |
   | #huditimelineservice | `63.47% <ø> (ø)` | `47.00 <ø> (ø)` | |
   | #hudiutilities | `73.75% <ø> (ø)` | `287.00 <ø> (ø)` | |
   
   | [Impacted Files](https://codecov.io/gh/apache/hudi/pull/1149?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [.../apache/hudi/client/AbstractHoodieWriteClient.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpZW50L0Fic3RyYWN0SG9vZGllV3JpdGVDbGllbnQuamF2YQ==) | | | |
   | [...n/deltacommit/UpsertDeltaCommitActionExecutor.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2RlbHRhY29tbWl0L1Vwc2VydERlbHRhQ29tbWl0QWN0aW9uRXhlY3V0b3IuamF2YQ==) | | | |
   | [...c/main/java/org/apache/hudi/io/HoodieIOHandle.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW8vSG9vZGllSU9IYW5kbGUuamF2YQ==) | | | |
   | [...che/hudi/table/action/compact/OperationResult.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NvbXBhY3QvT3BlcmF0aW9uUmVzdWx0LmphdmE=) | | | |
   | [...g/apache/hudi/table/action/BaseActionExecutor.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL0Jhc2VBY3Rpb25FeGVjdXRvci5qYXZh) | | | |
   | [...hudi/table/action/commit/CommitActionExecutor.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NvbW1pdC9Db21taXRBY3Rpb25FeGVjdXRvci5qYXZh) | | | |
   | [...e/hudi/table/action/clean/CleanActionExecutor.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NsZWFuL0NsZWFuQWN0aW9uRXhlY3V0b3IuamF2YQ==) | | | |
   | [...apache/hudi/client/utils/LazyIterableIterator.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY2xpZW50L3V0aWxzL0xhenlJdGVyYWJsZUl0ZXJhdG9yLmphdmE=) | | | |
   | [...n/java/org/apache/hudi/index/HoodieIndexUtils.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvaW5kZXgvSG9vZGllSW5kZXhVdGlscy5qYXZh) | | | |
   | [...tion/commit/InsertPreppedCommitActionExecutor.java](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NvbW1pdC9JbnNlcnRQcmVwcGVkQ29tbWl0QWN0aW9uRXhlY3V0b3IuamF2YQ==) | | | |
   | ... and [120 more](https://codecov.io/gh/apache/hudi/pull/1149/diff?src=pr&el=tree-more) | |
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362994779
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/NonSortPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.spark.api.java.JavaRDD;
+
+public class NonSortPartitioner<T extends HoodieRecordPayload>
 
 Review comment:
   We are using "Sort" in the other names, should we be consistent either "shuffling" or "sort" ? 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362995044
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java
 ##########
 @@ -31,4 +31,6 @@
 public interface UserDefinedBulkInsertPartitioner<T extends HoodieRecordPayload> {
 
   JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions);
+
+  boolean arePartitionRecordsSorted();
 
 Review comment:
   +1

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] yihua commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-666757074


   I'll take a pass tonight.  Also, @nsivabalan could you `Squash and merge` in Github?  I'd like to keep the commits local in this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] n3nash edited a comment on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
n3nash edited a comment on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-669315726


   @nsivabalan thanks for the context. I see what you're saying. It's most likely Uber using this special partitioner so the blast radius is probably limited. I see that a change is required in either scenario, changing the name is fine and keeps it clear without the need to add a placeholder abstract class. We will absorb the backwards incompatible change on our end.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-657946414


   @yihua is this ready for review? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] nsivabalan commented on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
nsivabalan commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-660731926


   @vinothchandar : yes, the patch is ready for review. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362295974
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/table/UserDefinedBulkInsertPartitioner.java
 ##########
 @@ -31,4 +31,6 @@
 public interface UserDefinedBulkInsertPartitioner<T extends HoodieRecordPayload> {
 
   JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions);
+
+  boolean arePartitionRecordsSorted();
 
 Review comment:
   please improve javadocs of this interface while you are here :) 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [hudi] vinothchandar commented on a change in pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#discussion_r461888109



##########
File path: hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
##########
@@ -161,6 +161,17 @@ public static void writePartitionMetadata(FileSystem fs, String[] partitionPaths
     }
   }
 
+  public static List<HoodieRecord> newHoodieRecords(int n, String time) throws Exception {

Review comment:
       can we not use the existing methods in the data generator to write these tests?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yihua commented on pull request #1149: [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on pull request #1149:
URL: https://github.com/apache/hudi/pull/1149#issuecomment-664834387


   @vinothchandar @nsivabalan this PR is ready for another review.
   
   I fixed the failing tests.  I also simplified the bulk insert logic regarding different sort modes.  Besides, I added more javadocs and cleaned up the code style.
   
   For the default sort mode for bulk insert, shall we set it to a mode other than GLOBAL_SORT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on a change in pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert
URL: https://github.com/apache/incubator-hudi/pull/1149#discussion_r362302093
 
 

 ##########
 File path: hudi-client/src/main/java/org/apache/hudi/func/bulkinsert/NonSortPartitioner.java
 ##########
 @@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.func.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.spark.api.java.JavaRDD;
+
+public class NonSortPartitioner<T extends HoodieRecordPayload>
 
 Review comment:
   Yes, will rename.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [incubator-hudi] yihua commented on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
yihua commented on pull request #1149:
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-624436427


   @vinothchandar @n3nash I'm still working on resolving one failed unit test.  Somehow all tests pass in CI, but locally I have this single test failed, `org.apache.hudi.table.action.commit.TestCopyOnWriteActionExecutor#testInsertUpsertWithHoodieAvroPayload`:
   
   ```
   Caused by: java.util.NoSuchElementException: No value present in Option
   	at org.apache.hudi.common.util.Option.get(Option.java:88)
   	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:77)
   	at org.apache.hudi.table.action.commit.CommitActionExecutor.getUpdateHandle(CommitActionExecutor.java:118)
   	at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:75)
   	at org.apache.hudi.table.action.commit.TestCopyOnWriteActionExecutor.lambda$testInsertUpsertWithHoodieAvroPayload$446d3dad$1(TestCopyOnWriteActionExecutor.java:434)
   ```
   
   I don't seem to change the merge logic.  If anyone familiar with this test can shed some light, that'd be great.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] codecov-io edited a comment on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #1149:
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-623229474


   # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=h1) Report
   > Merging [#1149](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/506447fd4fde4cd922f7aa8f4e17a7f06666dc97&el=desc) will **decrease** coverage by `0.22%`.
   > The diff coverage is `55.17%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1149/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1149      +/-   ##
   ============================================
   - Coverage     71.82%   71.59%   -0.23%     
     Complexity      294      294              
   ============================================
     Files           385      393       +8     
     Lines         16549    16659     +110     
     Branches       1661     1663       +2     
   ============================================
   + Hits          11886    11927      +41     
   - Misses         3931     4000      +69     
     Partials        732      732              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...sert/BulkInsertMapFunctionForNonSortedRecords.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uRm9yTm9uU29ydGVkUmVjb3Jkcy5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [.../hudi/execution/bulkinsert/NonSortPartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvTm9uU29ydFBhcnRpdGlvbmVyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...n/bulkinsert/RDDPartitionLocalSortPartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uTG9jYWxTb3J0UGFydGl0aW9uZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...ution/bulkinsert/RDDPartitionRangePartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uUmFuZ2VQYXJ0aXRpb25lci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...tion/bulkinsert/BulkInsertInternalPartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydEludGVybmFsUGFydGl0aW9uZXIuamF2YQ==) | `53.84% <53.84%> (ø)` | `0.00 <0.00> (?)` | |
   | [...che/hudi/table/action/commit/BulkInsertHelper.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NvbW1pdC9CdWxrSW5zZXJ0SGVscGVyLmphdmE=) | `74.19% <62.50%> (-10.81%)` | `0.00 <0.00> (ø)` | |
   | [...java/org/apache/hudi/config/HoodieWriteConfig.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29uZmlnL0hvb2RpZVdyaXRlQ29uZmlnLmphdmE=) | `84.14% <80.76%> (-0.71%)` | `0.00 <0.00> (ø)` | |
   | [...pache/hudi/execution/CopyOnWriteInsertHandler.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0NvcHlPbldyaXRlSW5zZXJ0SGFuZGxlci5qYXZh) | `94.11% <94.11%> (ø)` | `0.00 <0.00> (?)` | |
   | [.../org/apache/hudi/execution/LazyInsertIterable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0xhenlJbnNlcnRJdGVyYWJsZS5qYXZh) | `79.41% <100.00%> (-0.99%)` | `0.00 <0.00> (ø)` | |
   | [...di/execution/bulkinsert/BulkInsertMapFunction.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uLmphdmE=) | `75.00% <100.00%> (ø)` | `0.00 <0.00> (?)` | |
   | ... and [13 more](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=footer). Last update [506447f...1a4b9b9](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] codecov-io commented on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #1149:
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-623229474


   # [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=h1) Report
   > Merging [#1149](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=desc) into [master](https://codecov.io/gh/apache/incubator-hudi/commit/506447fd4fde4cd922f7aa8f4e17a7f06666dc97&el=desc) will **decrease** coverage by `0.22%`.
   > The diff coverage is `55.17%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-hudi/pull/1149/graphs/tree.svg?width=650&height=150&src=pr&token=VTTXabwbs2)](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master    #1149      +/-   ##
   ============================================
   - Coverage     71.82%   71.59%   -0.23%     
     Complexity      294      294              
   ============================================
     Files           385      393       +8     
     Lines         16549    16659     +110     
     Branches       1661     1663       +2     
   ============================================
   + Hits          11886    11927      +41     
   - Misses         3931     4000      +69     
     Partials        732      732              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=tree) | Coverage Δ | Complexity Δ | |
   |---|---|---|---|
   | [...sert/BulkInsertMapFunctionForNonSortedRecords.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uRm9yTm9uU29ydGVkUmVjb3Jkcy5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [.../hudi/execution/bulkinsert/NonSortPartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvTm9uU29ydFBhcnRpdGlvbmVyLmphdmE=) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...n/bulkinsert/RDDPartitionLocalSortPartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uTG9jYWxTb3J0UGFydGl0aW9uZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...ution/bulkinsert/RDDPartitionRangePartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvUkREUGFydGl0aW9uUmFuZ2VQYXJ0aXRpb25lci5qYXZh) | `0.00% <0.00%> (ø)` | `0.00 <0.00> (?)` | |
   | [...tion/bulkinsert/BulkInsertInternalPartitioner.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydEludGVybmFsUGFydGl0aW9uZXIuamF2YQ==) | `53.84% <53.84%> (ø)` | `0.00 <0.00> (?)` | |
   | [...che/hudi/table/action/commit/BulkInsertHelper.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvdGFibGUvYWN0aW9uL2NvbW1pdC9CdWxrSW5zZXJ0SGVscGVyLmphdmE=) | `74.19% <62.50%> (-10.81%)` | `0.00 <0.00> (ø)` | |
   | [...java/org/apache/hudi/config/HoodieWriteConfig.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvY29uZmlnL0hvb2RpZVdyaXRlQ29uZmlnLmphdmE=) | `84.14% <80.76%> (-0.71%)` | `0.00 <0.00> (ø)` | |
   | [...pache/hudi/execution/CopyOnWriteInsertHandler.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0NvcHlPbldyaXRlSW5zZXJ0SGFuZGxlci5qYXZh) | `94.11% <94.11%> (ø)` | `0.00 <0.00> (?)` | |
   | [.../org/apache/hudi/execution/LazyInsertIterable.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL0xhenlJbnNlcnRJdGVyYWJsZS5qYXZh) | `79.41% <100.00%> (-0.99%)` | `0.00 <0.00> (ø)` | |
   | [...di/execution/bulkinsert/BulkInsertMapFunction.java](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree#diff-aHVkaS1jbGllbnQvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2h1ZGkvZXhlY3V0aW9uL2J1bGtpbnNlcnQvQnVsa0luc2VydE1hcEZ1bmN0aW9uLmphdmE=) | `75.00% <100.00%> (ø)` | `0.00 <0.00> (?)` | |
   | ... and [13 more](https://codecov.io/gh/apache/incubator-hudi/pull/1149/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=footer). Last update [506447f...1a4b9b9](https://codecov.io/gh/apache/incubator-hudi/pull/1149?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-hudi] vinothchandar commented on pull request #1149: [WIP] [HUDI-472] Introduce configurations and new modes of sorting for bulk_insert

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on pull request #1149:
URL: https://github.com/apache/incubator-hudi/pull/1149#issuecomment-624115902


   @yihua still on this? 
   @n3nash could you review this please?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org