You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "nsivabalan (via GitHub)" <gi...@apache.org> on 2023/02/27 07:39:59 UTC

[GitHub] [hudi] nsivabalan commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

nsivabalan commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118345162


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,

Review Comment:
   I feel this constructor should just take in properties and not Deltastreamer config. this should be very generic. irrespective of whether one writes via deltastreamer or spark-ds, they should be able to enable error table. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")

Review Comment:
   why this is specific to deltastreamer.
   may be "hoodie.error.table.enable" is better so that one can enable this error table for any writes (spark ds, spark-sql etc) 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is enabled, "
+          + "all the records with processing error in DeltaStreamer are transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error table write fails. "
+          + "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));
+
+  public enum ErrorWriteFailureStrategy {
+    ROLLBACK_COMMIT("Rollback the corresponding base table write commit for which the error events were triggered"),

Review Comment:
   instead of ROLLBACK_COMMIT, should we name it as "FAIL_WRITES". bcoz, error could happen anywhere in the lifecycle right. somehow rollback feels like, the write succeeds and then we roll it back if there are errors. 
   What happens if there are error records during consuming from source or during transformation ? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -116,10 +116,79 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
     }, SQLConf.get)
   }
 
-  def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
-    sparkAdapter.createSparkRowSerDe(structType)
-  }
-
   private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
+
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
+                    latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()):
+  Tuple2[RDD[GenericRecord], RDD[String]] = {

Review Comment:
   are these moved from elsewhere? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;

Review Comment:
   private, final 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -118,11 +177,24 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
-        return new InputBatch<>(
-            Option.ofNullable(
-                r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (errorTableWriterInterface.isPresent()) {
+          StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)

Review Comment:
   add java docs wherever we are invoking error table to explain what kind of processing we are doing. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  /**
+   * Processes input error events. These error events would be committed later through upsertAndCommit
+   * API call.
+   *
+   * @param errorEvent Input error event RDD
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);
+
+  /**
+   * Fetches the error events RDD processed by the writer so far. This is a test API.
+   */
+  @VisibleForTesting
+  public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option<String> commitedInstantTime);
+
+  /**
+   * This API is called to commit the error events (failed Hoodie Records) processed by the writer so far.
+   * These records are committed to a error table.
+   */
+  public abstract boolean upsertAndCommit(String baseTableInstantTime, Option<String> commitedInstantTime);

Review Comment:
   lets say there are concurrent error events (regular writer and for async table service). How this would pan out? I assume there will be only one instance of error table writer ? or there will be one per thread? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  /**
+   * Processes input error events. These error events would be committed later through upsertAndCommit
+   * API call.
+   *
+   * @param errorEvent Input error event RDD
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);
+
+  /**
+   * Fetches the error events RDD processed by the writer so far. This is a test API.
+   */
+  @VisibleForTesting
+  public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option<String> commitedInstantTime);
+
+  /**
+   * This API is called to commit the error events (failed Hoodie Records) processed by the writer so far.
+   * These records are committed to a error table.
+   */
+  public abstract boolean upsertAndCommit(String baseTableInstantTime, Option<String> commitedInstantTime);

Review Comment:
   don't mean to drag this patch further. but thinking out loud. similar to a typical startTxn, peformOperation, and then endTxn or commit Txn (in for db or data system apis), does it make sense to add startCommit api to this interface. I see addErrorEvents mapping to performOperations. upsertAndcommit mapping to endTxn or commitTxn. but don't see anything to denote that we are starting a new commit. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to

Review Comment:
   again, lets keep it agnostic of the write method. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -494,9 +505,38 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
 
+      transformed = formatAdapter.processErrorEvents(transformed,
+          ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        Option<BaseErrorTableWriter> schemaValidationErrorWriter =
+            (errorTableWriter.isPresent()
+                && props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+                HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()))
+                ? errorTableWriter : Option.empty();
+        avroRDDOptional = transformed
+            .map(row ->
+                schemaValidationErrorWriter
+                    .map(impl -> {
+                      Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = HoodieSparkUtils.safeCreateRDD(row,
+                          HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,
+                          Option.of(this.userProvidedSchemaProvider.getTargetSchema())
+                      );
+                      impl.addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+                          .map(evStr -> new ErrorEvent<>(evStr,
+                              ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
+                      return safeCreateRDDs._1();
+                    })
+                    .orElseGet(() -> HoodieSparkUtils.createRdd(row,
+                      HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,

Review Comment:
   lets move this to separate method and re-use in L 562 ish as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +765,26 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
+      if (errorTableWriter.isPresent()) {
+        // Removing writeStatus events from error events, as action on writeStatus can cause base table DAG to reexecute
+        // if original cached dataframe get's unpersisted before this action.
+        //        errorTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean errorTableSuccess = errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+        if (!errorTableSuccess) {
+          switch (errorWriteFailureStrategy) {
+            case ROLLBACK_COMMIT:
+              LOG.info("Commit " + instantTime + " failed!");
+              writeClient.rollback(instantTime);
+              throw new HoodieException("Error Table Commit failed!");
+            case LOG_ERROR:
+              LOG.error("Error Table write failed for instant " + instantTime);

Review Comment:
   can we log any other context here. whats the error or exception stacktrace or some info. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -51,10 +62,56 @@ public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
 
+  private Option<BaseErrorTableWriter> errorTableWriterInterface = Option.empty();

Review Comment:
   lets ensure we have consistent naming. 
   `errorTableWriter`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -494,9 +505,38 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
 
+      transformed = formatAdapter.processErrorEvents(transformed,
+          ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        Option<BaseErrorTableWriter> schemaValidationErrorWriter =

Review Comment:
   errorTableWriterForSchemaValidation



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {

Review Comment:
   java docs



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;
+  T payload;
+
+  public ErrorEvent(T payload, ErrorReason reason) {
+    this.payload = payload;
+    this.reason = reason;
+  }
+
+  public T getPayload() {
+    return payload;
+  }
+
+  public ErrorReason getReason() {
+    return reason;
+  }
+
+  public enum ErrorReason {

Review Comment:
   java docs



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;
+  T payload;
+
+  public ErrorEvent(T payload, ErrorReason reason) {
+    this.payload = payload;
+    this.reason = reason;
+  }
+
+  public T getPayload() {
+    return payload;
+  }
+
+  public ErrorReason getReason() {
+    return reason;
+  }
+
+  public enum ErrorReason {
+    JSON_AVRO_DESERIALIZATION_FAILURE,
+    JSON_ROW_DESERIALIZATION_FAILURE,
+    AVRO_DESERIALIZATION_FAILURE,

Review Comment:
   we should try and add docs for each entry here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is enabled, "
+          + "all the records with processing error in DeltaStreamer are transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error table write fails. "
+          + "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));

Review Comment:
   may be not required in this patch. but lets see if it makes sense to add another config for the following scenario
   if error table is enabled, but while processing the records to error table, there is some failure (uncaught) for eg, how do we wanna proceed. I guess as per this patch regular writes may fail I don't see any try, finally block. but wondering if we should give a config knob to user to either proceed w/o failing regular writes or fail the regular writes to data table as well.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org