You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/05/11 07:05:31 UTC

[GitHub] [hudi] codope commented on a diff in pull request #5501: [HUDI-4018][HUDI-4027] Adding integ test yamls for immutable use-cases. Added delete partition support to integ tests

codope commented on code in PR #5501:
URL: https://github.com/apache/hudi/pull/5501#discussion_r869901238


##########
hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeletePartitionNode.scala:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.integ.testsuite.dag.nodes
+
+
+import org.apache.avro.Schema
+import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.common.util.collection.Pair
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
+import org.apache.hudi.integ.testsuite.dag.ExecutionContext
+import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SaveMode
+
+import scala.collection.JavaConverters._
+
+/**
+ * Spark datasource based insert node
+ *
+ * @param dagNodeConfig DAG node configurations.
+ */
+class SparkDeletePartitionNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
+
+  private val log = LogManager.getLogger(getClass)
+  config = dagNodeConfig
+
+  /**
+   * Execute the {@link DagNode}.
+   *
+   * @param context     The context needed for an execution of a node.
+   * @param curItrCount iteration count for executing the node.
+   * @throws Exception Thrown if the execution failed.
+   */
+  override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
+    println("Generating input data for node {}", this.getName)
+
+    context.getWriterContext.getSparkSession.emptyDataFrame.write.format("hudi")
+      .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
+      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field")

Review Comment:
   `test_suite_source_ordering_field` is used across other nodes as well, maybe extract to a constant?



##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java:
##########
@@ -157,14 +157,19 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
     }
   }
 
-  private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session, String inputPath) {
+  private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session, String inputPath, String partitionsToSkipWithValidate) {
     String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
     String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
     // todo: fix hard coded fields from configs.
     // read input and resolve insert, updates, etc.
     Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
+    Dataset<Row> trimmedDf = inputDf;
+    if (!config.partitonsToSkipWithValidate().isEmpty()) {

Review Comment:
   Let's use the passed argument instead of config, or maybe remove the argument?



##########
docker/demo/config/test-suite/multi-writer-local-4.properties:
##########
@@ -0,0 +1,57 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=2

Review Comment:
   optional: should we keep these parallelism values default?



##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java:
##########
@@ -157,14 +157,19 @@ public void execute(ExecutionContext context, int curItrCount) throws Exception
     }
   }
 
-  private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session, String inputPath) {
+  private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session, String inputPath, String partitionsToSkipWithValidate) {
     String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
     String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
     // todo: fix hard coded fields from configs.
     // read input and resolve insert, updates, etc.
     Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
+    Dataset<Row> trimmedDf = inputDf;
+    if (!config.partitonsToSkipWithValidate().isEmpty()) {
+      trimmedDf = inputDf.filter("instr("+partitionPathField+", \'"+ config.partitonsToSkipWithValidate() +"\') != 1");

Review Comment:
   This would work with the value i.e. '0' passed in the config. I think a more fool-proof way than substring check would be to use equals check with some FSUtil method to get relative partition path.



##########
docker/demo/config/test-suite/non-core-operations.yaml:
##########
@@ -0,0 +1,204 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  first_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: none
+  first_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: first_insert
+  second_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: first_upsert
+  second_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: second_insert
+  first_insert_overwrite:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10
+    type: SparkInsertOverwriteNode
+    deps: second_upsert
+  delete_all_input_except_last:
+    config:
+      delete_input_data_except_latest: true
+    type: DeleteInputDatasetNode
+    deps: first_insert_overwrite
+  third_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: delete_all_input_except_last
+  third_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: third_insert
+  second_validate:

Review Comment:
   this is the first validate right? should we add first_validate after first_insert_overwrite?



##########
docker/demo/config/test-suite/non-core-operations.yaml:
##########
@@ -0,0 +1,204 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml

Review Comment:
   How about keeping different dag_name as per use case?



##########
docker/demo/config/test-suite/non-core-operations.yaml:
##########
@@ -0,0 +1,204 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  first_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: none
+  first_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: first_insert
+  second_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: first_upsert
+  second_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: second_insert
+  first_insert_overwrite:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10
+    type: SparkInsertOverwriteNode
+    deps: second_upsert
+  delete_all_input_except_last:
+    config:
+      delete_input_data_except_latest: true
+    type: DeleteInputDatasetNode
+    deps: first_insert_overwrite
+  third_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: delete_all_input_except_last
+  third_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: third_insert
+  second_validate:
+    config:
+      validate_full_data : true
+      validate_hive: false
+      delete_input_data: false
+    type: ValidateDatasetNode
+    deps: third_upsert
+  fourth_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: second_validate
+  fourth_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: fourth_insert
+  fifth_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      repeat_count: 1
+      num_records_insert: 10000
+    type: SparkInsertNode
+    deps: fourth_upsert
+  fifth_upsert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 10
+      num_records_insert: 1000
+      repeat_count: 1
+      num_records_upsert: 8000
+      num_partitions_upsert: 10
+    type: SparkUpsertNode
+    deps: fifth_insert
+  first_insert_overwrite_table:

Review Comment:
   should it not be second_insert_overwrite_table? Or is it first because after all it's an overwrite so doesn't matter what happened before this point?



##########
docker/demo/config/test-suite/multi-writer-local-4.properties:
##########
@@ -0,0 +1,57 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=2
+hoodie.upsert.shuffle.parallelism=2
+hoodie.bulkinsert.shuffle.parallelism=2
+hoodie.delete.shuffle.parallelism=2
+
+hoodie.metadata.enable=false
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.write.concurrency.mode=optimistic_concurrency_control
+hoodie.cleaner.policy.failed.writes=LAZY
+hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
+
+hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input4

Review Comment:
   is it only the dfs root path that differs across multiple writers? I'm thinking if we can avoid duplication. 



##########
hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java:
##########
@@ -131,11 +133,20 @@ public static void main(String[] args) throws Exception {
 
     AtomicBoolean jobFailed = new AtomicBoolean(false);
     AtomicInteger counter = new AtomicInteger(0);
+    List<Long> waitTimes = new ArrayList<>();
+    for (int i = 0;i < jobIndex ;i++) {
+      if (i == 0) {
+        waitTimes.add(0L);
+      } else {
+        // every job after 1st, will start after 1 min + some delta.
+        waitTimes.add(60000L + random.nextInt(10000));

Review Comment:
   should we make this delay configurable as well?



##########
docker/demo/config/test-suite/multi-writer-local-4.properties:
##########
@@ -0,0 +1,57 @@
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+hoodie.insert.shuffle.parallelism=2
+hoodie.upsert.shuffle.parallelism=2
+hoodie.bulkinsert.shuffle.parallelism=2
+hoodie.delete.shuffle.parallelism=2
+
+hoodie.metadata.enable=false
+
+hoodie.deltastreamer.source.test.num_partitions=100
+hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
+hoodie.deltastreamer.source.test.max_unique_records=100000000
+hoodie.embed.timeline.server=false
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+
+hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
+hoodie.datasource.hive_sync.skip_ro_suffix=true
+
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
+hoodie.datasource.write.partitionpath.field=timestamp
+
+hoodie.write.concurrency.mode=optimistic_concurrency_control
+hoodie.cleaner.policy.failed.writes=LAZY
+hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
+
+hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input4

Review Comment:
   Separate files does give us the flexibility to set other configs and make each writer behave differently. 



##########
docker/demo/config/test-suite/delete-partition.yaml:
##########
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+dag_name: simple-deltastreamer.yaml
+dag_rounds: 1
+dag_intermittent_delay_mins: 1
+dag_content:
+  first_insert:
+    config:
+      record_size: 1000
+      num_partitions_insert: 5
+      repeat_count: 1
+      num_records_insert: 10
+    type: SparkInsertNode
+    deps: none
+  first_delete_partition:
+    config:
+      partitions_to_delete: "1969/12/31"
+    type: SparkDeletePartitionNode
+    deps: first_insert
+  second_validate:
+    config:
+      validate_full_data : true
+      partitions_to_skip_validate : "0"

Review Comment:
   optional: should we rename this to `input_partitions_to_skip_validate`? It wasn't clear to me why this has '0' value when hudi table has date partitions until I read the code in BaseValidateDatasetNode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org