You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/02/16 17:24:14 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

nsivabalan commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1108765957


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieQuarantineTableConfig.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+@ConfigClassProperty(name = "Quarantine table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Quarantine table configs")
+public class HoodieQuarantineTableConfig {
+  public static final ConfigProperty<Boolean> QUARANTINE_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.enable")
+      .defaultValue(false)
+      .withDocumentation("config to enable quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.base.path")
+      .noDefaultValue()
+      .withDocumentation("base path for quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.target.table.name")

Review Comment:
   do we have any concrete impl as part of this patch? 
   if not, I assume these configs are unused as of now. wondering if its worth adding w/o any use.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieQuarantineTableConfig.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+@ConfigClassProperty(name = "Quarantine table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Quarantine table configs")
+public class HoodieQuarantineTableConfig {
+  public static final ConfigProperty<Boolean> QUARANTINE_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.enable")
+      .defaultValue(false)
+      .withDocumentation("config to enable quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.base.path")
+      .noDefaultValue()
+      .withDocumentation("base path for quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("target table name for quarantine table");
+
+  public static final ConfigProperty<Integer> QUARANTINE_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("config to set upsert shuffle parallelism");
+
+  public static final ConfigProperty<Integer> QUARANTINE_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("config to set insert shuffle parallelism");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_WRITER_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.writer.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the quarantine table writes");

Review Comment:
   we need more docs around this



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {
+
+  public static String QUARANTINE_PAYLOAD_CLASS = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload";
+
+  public static String QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseQuarantineTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  public abstract HoodieWriteConfig getQuarantineTableWriteConfig();
+
+  public abstract HoodieDeltaStreamer.Config getSourceDeltaStreamerConfig();
+
+  /***
+   *
+   * @param errorEvent
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);

Review Comment:
   we need more docs for such public interfaces



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   java docs for public interface



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +738,18 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
+      if (quarantineTableWriterInterfaceImpl.isPresent()) {
+        String qurantineTableStartInstant = quarantineTableWriterInterfaceImpl.get().startCommit();
+        quarantineTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean quarantineTableSuccess = quarantineTableWriterInterfaceImpl.get().upsertAndCommit(qurantineTableStartInstant, instantTime, commitedInstantTime);
+        if (!quarantineTableSuccess) {
+          LOG.info("Qurantine Table Commit " + qurantineTableStartInstant + " failed!");
+          LOG.info("Commit " + instantTime + " failed!");
+          writeClient.rollback(instantTime);

Review Comment:
   we should also probably add a config along w/ qurantine whether to rollback original commit or proceed w/o failing. may be some users might feel quarantine is best effort and they may not refer failing the original writes



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {
+
+  public static String QUARANTINE_PAYLOAD_CLASS = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload";
+
+  public static String QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";

Review Comment:
   docs on what this is about.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -236,6 +239,8 @@ public class DeltaSync implements Serializable, Closeable {
    */
   private transient SparkRDDWriteClient writeClient;
 
+  private Option<BaseQuarantineTableWriter> quarantineTableWriterInterfaceImpl = Option.empty();
+

Review Comment:
   can we call this "quarantineTableWriter"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -50,10 +61,55 @@ public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
 
+  private Option<BaseQuarantineTableWriter> quarantineTableWriterInterface = Option.empty();
+
   public SourceFormatAdapter(Source source) {
     this.source = source;
   }
 
+  public SourceFormatAdapter(Source source, Option<BaseQuarantineTableWriter> quarantineTableWriterInterface) {

Review Comment:
   can u chain the constructor. the other cnstr (L66) should call this one w/ Option.empty() as last arg



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -100,7 +155,9 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case ROW:
-        return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        return new InputBatch<>(transformDatasetWithQuarantineEvents(datasetInputBatch.getBatch()),

Review Comment:
   nit: rename to "mayBeProcessQuarantineEvents"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   lets call out the sequence of calls of how this might be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org