You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "lokeshj1703 (via GitHub)" <gi...@apache.org> on 2023/02/16 12:06:25 UTC

[GitHub] [hudi] lokeshj1703 opened a new pull request, #7982: Quarantine table support for error events

lokeshj1703 opened a new pull request, #7982:
URL: https://github.com/apache/hudi/pull/7982

   ### Change Logs
   
   This PR adds capability for adding source conversion error events to quarantine table . 
   *Approach* : 
   
   -  Track error events in `SourceFormatAdapter` 
   -  Track error events from `WriteStatus`
   -  Added new WriteClient for quarantine table `BaseQuarantineTableWriter` 
   -  If commit to quarantine table fails then throw error so that delta sync fails . 
   
   ### Impact
   
   NA
   
   ### Risk level (write none, low medium or high below)
   
   Low (The feature is disabled by default)
   
   ### Documentation Update
   
   _Describe any necessary documentation update if there is any new feature, config, or user-facing change_
   
   - _The config description must be updated if new configs are added or the default value of the configs are changed_
   - _Any new feature or user-facing change requires updating the Hudi website. Please create a Jira ticket, attach the
     ticket number here and follow the [instruction](https://hudi.apache.org/contribute/developer-setup#website) to make
     changes to the website._
   
   ### Contributor's checklist
   
   - [ ] Read through [contributor's guide](https://hudi.apache.org/contribute/how-to-contribute)
   - [ ] Change Logs and Impact were stated clearly
   - [ ] Adequate tests were added if applicable
   - [ ] CI passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1433899119

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5cad5f60143509cf837f8b01467b660fefa8f77 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1446324045

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361) 
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1448179842

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15466",
       "triggerID" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "triggerType" : "PUSH"
     }, {
       "hash" : "252c184b2499599f428ee499231ebfd945628493",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15471",
       "triggerID" : "252c184b2499599f428ee499231ebfd945628493",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * 252c184b2499599f428ee499231ebfd945628493 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15471) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope merged pull request #7982: [HUDI-5813] Error table support for error events

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope merged PR #7982:
URL: https://github.com/apache/hudi/pull/7982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1436885796

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30f2fc136656bbe0567dd7c0cf15916e0cc06af4 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278) 
   * 1b159da06fc8f6521ecfb5e047955e0ba3156730 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1109675049


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   Should we rename to `BaseHoodieAuditTableWriter`, `HoodieAuditTableConfigs`, `AuditEvent`, `AuditErrorEvent` and `AuditUtils`? Basically, I am suggesting to evolve this into a more general audit framework. Error, is just one type of event. Others could be to keep record of updates/deletions in this audit table.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+
+public abstract class QuarantineEvent<T> {
+
+  QuarantineReason reason;
+  T payload;
+
+  public abstract DataType getPayloadType();

Review Comment:
   this method is not used. remove it?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -746,6 +786,29 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
     return Pair.of(scheduledCompactionInstant, writeStatusRDD);
   }
 
+  protected JavaRDD<QuarantineJsonEvent> getErrorEventsForWriteStatus(JavaRDD<WriteStatus> writeStatusRDD) {
+    HoodieWriteConfig config = writeClient.getConfig();
+
+    return writeStatusRDD
+        .filter(WriteStatus::hasErrors)
+        .flatMap(x -> {
+          Schema schema = Schema.parse(config.getSchema());
+          Properties props = config.getPayloadConfig().getProps();
+          return x.getFailedRecords().stream()
+              .map(z -> {
+                HoodieRecordPayload hoodieRecordPayload = (HoodieRecordPayload)z.getData();
+                String recordStr;
+                try {
+                  recordStr = (String) hoodieRecordPayload.getInsertValue(schema,
+                      props).map(value -> value.toString()).get();
+                } catch (IOException e) {
+                  recordStr = null;

Review Comment:
   Should it be a reasonable message to distinguish IOException from Hudi?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+
+public abstract class QuarantineEvent<T> {
+
+  QuarantineReason reason;
+  T payload;
+
+  public abstract DataType getPayloadType();
+
+  public abstract String getPayload();
+
+  public QuarantineReason getReason() {
+    return reason;
+  }
+
+  public abstract GenericRecord getAvroPayload();
+
+  public enum QuarantineReason {
+    JSON_AVRO_DESERIALIZATION_FAILURE,

Review Comment:
   Should we have error code for these failure reason? If you agree, it can be taken up as a follow-up, but what do you think of the suggestion?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -117,11 +174,23 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
-        return new InputBatch<>(
-            Option.ofNullable(
-                r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (quarantineTableWriterInterface.isPresent()) {
+          StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
+              .add(new StructField(QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME, DataTypes.StringType, true, Metadata.empty()));
+          Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> source.getSparkSession().read()
+              .option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME).schema(dataType)

Review Comment:
   mode is PERMISSIVE by default. No need to set explicitly unless the default has changed in later Spark versions. Can you check?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -236,6 +239,8 @@ public class DeltaSync implements Serializable, Closeable {
    */
   private transient SparkRDDWriteClient writeClient;
 
+  private Option<BaseQuarantineTableWriter> quarantineTableWriterInterfaceImpl = Option.empty();
+

Review Comment:
   +1



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +738,18 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
+      if (quarantineTableWriterInterfaceImpl.isPresent()) {
+        String qurantineTableStartInstant = quarantineTableWriterInterfaceImpl.get().startCommit();
+        quarantineTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean quarantineTableSuccess = quarantineTableWriterInterfaceImpl.get().upsertAndCommit(qurantineTableStartInstant, instantTime, commitedInstantTime);
+        if (!quarantineTableSuccess) {
+          LOG.info("Qurantine Table Commit " + qurantineTableStartInstant + " failed!");
+          LOG.info("Commit " + instantTime + " failed!");
+          writeClient.rollback(instantTime);

Review Comment:
   that's a good suggestion. keep rollback as the default behavior



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+
+public abstract class QuarantineEvent<T> {
+
+  QuarantineReason reason;
+  T payload;
+
+  public abstract DataType getPayloadType();
+
+  public abstract String getPayload();
+
+  public QuarantineReason getReason() {
+    return reason;
+  }
+
+  public abstract GenericRecord getAvroPayload();

Review Comment:
   remove this as well? i think the payload type is implementation detail and should not be part of abstraction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1439746591

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f705e62c81684f03d025c614a180673bc16da74b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314) 
   * 667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1446336727

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361) 
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * ab98a8996106108a6afad74de71c69f229cd550d UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1439916747

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338) 
   * b6ef7c09ed66456fa285d6136360ed35087c3de7 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1114048696


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieQuarantineTableConfig.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+@ConfigClassProperty(name = "Quarantine table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Quarantine table configs")
+public class HoodieQuarantineTableConfig {
+  public static final ConfigProperty<Boolean> QUARANTINE_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.enable")
+      .defaultValue(false)
+      .withDocumentation("config to enable quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.base.path")
+      .noDefaultValue()
+      .withDocumentation("base path for quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.target.table.name")

Review Comment:
   Added more description.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {
+
+  public static String QUARANTINE_PAYLOAD_CLASS = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload";
+
+  public static String QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118698503


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;
+  T payload;
+
+  public ErrorEvent(T payload, ErrorReason reason) {
+    this.payload = payload;
+    this.reason = reason;
+  }
+
+  public T getPayload() {
+    return payload;
+  }
+
+  public ErrorReason getReason() {
+    return reason;
+  }
+
+  public enum ErrorReason {
+    JSON_AVRO_DESERIALIZATION_FAILURE,
+    JSON_ROW_DESERIALIZATION_FAILURE,
+    AVRO_DESERIALIZATION_FAILURE,

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118513199


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -116,10 +116,79 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
     }, SQLConf.get)
   }
 
-  def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
-    sparkAdapter.createSparkRowSerDe(structType)
-  }
-
   private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
+
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
+                    latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()):
+  Tuple2[RDD[GenericRecord], RDD[String]] = {

Review Comment:
   These APIs are specific to error table and are called from DeltaSync when error table is enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1441518724

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b6ef7c09ed66456fa285d6136360ed35087c3de7 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339) 
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 016ef6eb042f6181d35095866978a9d4744c7b14 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1440205127

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b6ef7c09ed66456fa285d6136360ed35087c3de7 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1441585782

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 016ef6eb042f6181d35095866978a9d4744c7b14 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359) 
   * 8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1447116827

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * ab98a8996106108a6afad74de71c69f229cd550d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1119550666


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   Auditing could mean different things IMO. Error tables sounds ok to me. Auditing could mean, adding events/logs for all operations done to the table. So, when we add one such auditing support, I assume its interface could be different as well. likely its about sending event notifications. Not fully convinced that we might use same interface for both routing error records and auditing purposes. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1109654706


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1441574460

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b6ef7c09ed66456fa285d6136360ed35087c3de7 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339) 
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 016ef6eb042f6181d35095866978a9d4744c7b14 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359) 
   * 8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1439906889

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f705e62c81684f03d025c614a180673bc16da74b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314) 
   * 667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338) 
   * b6ef7c09ed66456fa285d6136360ed35087c3de7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1438455933

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b159da06fc8f6521ecfb5e047955e0ba3156730 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296) 
   * f705e62c81684f03d025c614a180673bc16da74b UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1438736416

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f705e62c81684f03d025c614a180673bc16da74b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118696742


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -494,9 +505,38 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
 
+      transformed = formatAdapter.processErrorEvents(transformed,
+          ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        Option<BaseErrorTableWriter> schemaValidationErrorWriter =

Review Comment:
   Refactored it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1447604612

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15466",
       "triggerID" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * ab98a8996106108a6afad74de71c69f229cd550d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442) 
   * 242735ddb5c277194fc7a6ea42b8dd0ca9bbe594 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15466) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118699762


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is enabled, "
+          + "all the records with processing error in DeltaStreamer are transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error table write fails. "
+          + "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));

Review Comment:
   I think we can have a single config and maybe improve the APIs so that these scenarios can be handled. We can declare the API to throw an Exception, a try catch then should handle uncaught exceptions as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1108765957


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieQuarantineTableConfig.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+@ConfigClassProperty(name = "Quarantine table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Quarantine table configs")
+public class HoodieQuarantineTableConfig {
+  public static final ConfigProperty<Boolean> QUARANTINE_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.enable")
+      .defaultValue(false)
+      .withDocumentation("config to enable quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.base.path")
+      .noDefaultValue()
+      .withDocumentation("base path for quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.target.table.name")

Review Comment:
   do we have any concrete impl as part of this patch? 
   if not, I assume these configs are unused as of now. wondering if its worth adding w/o any use.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieQuarantineTableConfig.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+@ConfigClassProperty(name = "Quarantine table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Quarantine table configs")
+public class HoodieQuarantineTableConfig {
+  public static final ConfigProperty<Boolean> QUARANTINE_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.enable")
+      .defaultValue(false)
+      .withDocumentation("config to enable quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.base.path")
+      .noDefaultValue()
+      .withDocumentation("base path for quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("target table name for quarantine table");
+
+  public static final ConfigProperty<Integer> QUARANTINE_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("config to set upsert shuffle parallelism");
+
+  public static final ConfigProperty<Integer> QUARANTINE_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("config to set insert shuffle parallelism");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_WRITER_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.writer.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the quarantine table writes");

Review Comment:
   we need more docs around this



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {
+
+  public static String QUARANTINE_PAYLOAD_CLASS = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload";
+
+  public static String QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseQuarantineTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  public abstract HoodieWriteConfig getQuarantineTableWriteConfig();
+
+  public abstract HoodieDeltaStreamer.Config getSourceDeltaStreamerConfig();
+
+  /***
+   *
+   * @param errorEvent
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);

Review Comment:
   we need more docs for such public interfaces



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   java docs for public interface



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +738,18 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
+      if (quarantineTableWriterInterfaceImpl.isPresent()) {
+        String qurantineTableStartInstant = quarantineTableWriterInterfaceImpl.get().startCommit();
+        quarantineTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean quarantineTableSuccess = quarantineTableWriterInterfaceImpl.get().upsertAndCommit(qurantineTableStartInstant, instantTime, commitedInstantTime);
+        if (!quarantineTableSuccess) {
+          LOG.info("Qurantine Table Commit " + qurantineTableStartInstant + " failed!");
+          LOG.info("Commit " + instantTime + " failed!");
+          writeClient.rollback(instantTime);

Review Comment:
   we should also probably add a config along w/ qurantine whether to rollback original commit or proceed w/o failing. may be some users might feel quarantine is best effort and they may not refer failing the original writes



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {
+
+  public static String QUARANTINE_PAYLOAD_CLASS = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload";
+
+  public static String QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";

Review Comment:
   docs on what this is about.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -236,6 +239,8 @@ public class DeltaSync implements Serializable, Closeable {
    */
   private transient SparkRDDWriteClient writeClient;
 
+  private Option<BaseQuarantineTableWriter> quarantineTableWriterInterfaceImpl = Option.empty();
+

Review Comment:
   can we call this "quarantineTableWriter"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -50,10 +61,55 @@ public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
 
+  private Option<BaseQuarantineTableWriter> quarantineTableWriterInterface = Option.empty();
+
   public SourceFormatAdapter(Source source) {
     this.source = source;
   }
 
+  public SourceFormatAdapter(Source source, Option<BaseQuarantineTableWriter> quarantineTableWriterInterface) {

Review Comment:
   can u chain the constructor. the other cnstr (L66) should call this one w/ Option.empty() as last arg



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -100,7 +155,9 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case ROW:
-        return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        return new InputBatch<>(transformDatasetWithQuarantineEvents(datasetInputBatch.getBatch()),

Review Comment:
   nit: rename to "mayBeProcessQuarantineEvents"



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   lets call out the sequence of calls of how this might be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1446410557

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361) 
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * ab98a8996106108a6afad74de71c69f229cd550d Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1109675049


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   Should we rename to `BaseHoodieAuditTableWriter`, `HoodieAuditTableConfigs`, `AuditEvent`, `AuditErrorEvent` and `AuditUtils`? Basically, I am suggesting to evolve this into a more general-purpose audit table. Error, is just one type of event. Others could be to keep record of updates/deletions in this audit table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1111813017


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   Addressed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1432995591

   Failure in TestJsonKafkaSource#testErrorEventsForDataInRowForamt. Will need to debug that.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1438471096

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b159da06fc8f6521ecfb5e047955e0ba3156730 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296) 
   * f705e62c81684f03d025c614a180673bc16da74b Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118695129


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,

Review Comment:
   Created https://issues.apache.org/jira/browse/HUDI-5858 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118697569


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +765,26 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
+      if (errorTableWriter.isPresent()) {
+        // Removing writeStatus events from error events, as action on writeStatus can cause base table DAG to reexecute
+        // if original cached dataframe get's unpersisted before this action.
+        //        errorTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean errorTableSuccess = errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+        if (!errorTableSuccess) {
+          switch (errorWriteFailureStrategy) {
+            case ROLLBACK_COMMIT:
+              LOG.info("Commit " + instantTime + " failed!");
+              writeClient.rollback(instantTime);
+              throw new HoodieException("Error Table Commit failed!");
+            case LOG_ERROR:
+              LOG.error("Error Table write failed for instant " + instantTime);

Review Comment:
   Will need to change API. API doesn't throw exception right now.
   Created https://issues.apache.org/jira/browse/HUDI-5858 for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1441508138

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * b6ef7c09ed66456fa285d6136360ed35087c3de7 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339) 
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118699762


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is enabled, "
+          + "all the records with processing error in DeltaStreamer are transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error table write fails. "
+          + "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));

Review Comment:
   I think we can have a single config and maybe improve the APIs so that these scenarios can be handled. For example if we declare the API to throw Exception this case would be handled then because we will need to add a try catch in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118698239


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;
+  T payload;
+
+  public ErrorEvent(T payload, ErrorReason reason) {
+    this.payload = payload;
+    this.reason = reason;
+  }
+
+  public T getPayload() {
+    return payload;
+  }
+
+  public ErrorReason getReason() {
+    return reason;
+  }
+
+  public enum ErrorReason {

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1111813328


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {
+
+  public static String QUARANTINE_PAYLOAD_CLASS = "org.apache.hudi.common.model.OverwriteWithLatestAvroPayload";
+
+  public static String QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseQuarantineTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  public abstract HoodieWriteConfig getQuarantineTableWriteConfig();
+
+  public abstract HoodieDeltaStreamer.Config getSourceDeltaStreamerConfig();
+
+  /***
+   *
+   * @param errorEvent
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);

Review Comment:
   Addressed in latest commit.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieQuarantineTableConfig.java:
##########
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+@Immutable
+@ConfigClassProperty(name = "Quarantine table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Quarantine table configs")
+public class HoodieQuarantineTableConfig {
+  public static final ConfigProperty<Boolean> QUARANTINE_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.enable")
+      .defaultValue(false)
+      .withDocumentation("config to enable quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.base.path")
+      .noDefaultValue()
+      .withDocumentation("base path for quarantine table");
+
+  public static final ConfigProperty<String> QUARANTINE_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("target table name for quarantine table");
+
+  public static final ConfigProperty<Integer> QUARANTINE_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("config to set upsert shuffle parallelism");
+
+  public static final ConfigProperty<Integer> QUARANTINE_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("config to set insert shuffle parallelism");
+
+  public static final ConfigProperty<String> QUARANTINE_TABLE_WRITER_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.quarantinetable.writer.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the quarantine table writes");

Review Comment:
   Addressed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1111809865


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineJsonEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+
+public class QuarantineJsonEvent extends QuarantineEvent<String> {

Review Comment:
   Removed class QuarantineJsonEvent. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -746,6 +786,29 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
     return Pair.of(scheduledCompactionInstant, writeStatusRDD);
   }
 
+  protected JavaRDD<QuarantineJsonEvent> getErrorEventsForWriteStatus(JavaRDD<WriteStatus> writeStatusRDD) {
+    HoodieWriteConfig config = writeClient.getConfig();
+
+    return writeStatusRDD
+        .filter(WriteStatus::hasErrors)
+        .flatMap(x -> {
+          Schema schema = Schema.parse(config.getSchema());
+          Properties props = config.getPayloadConfig().getProps();
+          return x.getFailedRecords().stream()
+              .map(z -> {
+                HoodieRecordPayload hoodieRecordPayload = (HoodieRecordPayload)z.getData();
+                String recordStr;
+                try {
+                  recordStr = (String) hoodieRecordPayload.getInsertValue(schema,
+                      props).map(value -> value.toString()).get();
+                } catch (IOException e) {
+                  recordStr = null;

Review Comment:
   The function is not used anywhere. Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1437230309

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1b159da06fc8f6521ecfb5e047955e0ba3156730 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118345162


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,

Review Comment:
   I feel this constructor should just take in properties and not Deltastreamer config. this should be very generic. irrespective of whether one writes via deltastreamer or spark-ds, they should be able to enable error table. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")

Review Comment:
   why this is specific to deltastreamer.
   may be "hoodie.error.table.enable" is better so that one can enable this error table for any writes (spark ds, spark-sql etc) 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is enabled, "
+          + "all the records with processing error in DeltaStreamer are transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error table write fails. "
+          + "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));
+
+  public enum ErrorWriteFailureStrategy {
+    ROLLBACK_COMMIT("Rollback the corresponding base table write commit for which the error events were triggered"),

Review Comment:
   instead of ROLLBACK_COMMIT, should we name it as "FAIL_WRITES". bcoz, error could happen anywhere in the lifecycle right. somehow rollback feels like, the write succeeds and then we roll it back if there are errors. 
   What happens if there are error records during consuming from source or during transformation ? 



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -116,10 +116,79 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
     }, SQLConf.get)
   }
 
-  def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
-    sparkAdapter.createSparkRowSerDe(structType)
-  }
-
   private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
+
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
+                    latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()):
+  Tuple2[RDD[GenericRecord], RDD[String]] = {

Review Comment:
   are these moved from elsewhere? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;

Review Comment:
   private, final 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -118,11 +177,24 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
-        return new InputBatch<>(
-            Option.ofNullable(
-                r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (errorTableWriterInterface.isPresent()) {
+          StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)

Review Comment:
   add java docs wherever we are invoking error table to explain what kind of processing we are doing. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  /**
+   * Processes input error events. These error events would be committed later through upsertAndCommit
+   * API call.
+   *
+   * @param errorEvent Input error event RDD
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);
+
+  /**
+   * Fetches the error events RDD processed by the writer so far. This is a test API.
+   */
+  @VisibleForTesting
+  public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option<String> commitedInstantTime);
+
+  /**
+   * This API is called to commit the error events (failed Hoodie Records) processed by the writer so far.
+   * These records are committed to a error table.
+   */
+  public abstract boolean upsertAndCommit(String baseTableInstantTime, Option<String> commitedInstantTime);

Review Comment:
   lets say there are concurrent error events (regular writer and for async table service). How this would pan out? I assume there will be only one instance of error table writer ? or there will be one per thread? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  /**
+   * Processes input error events. These error events would be committed later through upsertAndCommit
+   * API call.
+   *
+   * @param errorEvent Input error event RDD
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);
+
+  /**
+   * Fetches the error events RDD processed by the writer so far. This is a test API.
+   */
+  @VisibleForTesting
+  public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option<String> commitedInstantTime);
+
+  /**
+   * This API is called to commit the error events (failed Hoodie Records) processed by the writer so far.
+   * These records are committed to a error table.
+   */
+  public abstract boolean upsertAndCommit(String baseTableInstantTime, Option<String> commitedInstantTime);

Review Comment:
   don't mean to drag this patch further. but thinking out loud. similar to a typical startTxn, peformOperation, and then endTxn or commit Txn (in for db or data system apis), does it make sense to add startCommit api to this interface. I see addErrorEvents mapping to performOperations. upsertAndcommit mapping to endTxn or commitTxn. but don't see anything to denote that we are starting a new commit. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to

Review Comment:
   again, lets keep it agnostic of the write method. 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -494,9 +505,38 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
 
+      transformed = formatAdapter.processErrorEvents(transformed,
+          ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        Option<BaseErrorTableWriter> schemaValidationErrorWriter =
+            (errorTableWriter.isPresent()
+                && props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+                HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()))
+                ? errorTableWriter : Option.empty();
+        avroRDDOptional = transformed
+            .map(row ->
+                schemaValidationErrorWriter
+                    .map(impl -> {
+                      Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = HoodieSparkUtils.safeCreateRDD(row,
+                          HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,
+                          Option.of(this.userProvidedSchemaProvider.getTargetSchema())
+                      );
+                      impl.addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+                          .map(evStr -> new ErrorEvent<>(evStr,
+                              ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
+                      return safeCreateRDDs._1();
+                    })
+                    .orElseGet(() -> HoodieSparkUtils.createRdd(row,
+                      HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,

Review Comment:
   lets move this to separate method and re-use in L 562 ish as well



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +765,26 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
+      if (errorTableWriter.isPresent()) {
+        // Removing writeStatus events from error events, as action on writeStatus can cause base table DAG to reexecute
+        // if original cached dataframe get's unpersisted before this action.
+        //        errorTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean errorTableSuccess = errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+        if (!errorTableSuccess) {
+          switch (errorWriteFailureStrategy) {
+            case ROLLBACK_COMMIT:
+              LOG.info("Commit " + instantTime + " failed!");
+              writeClient.rollback(instantTime);
+              throw new HoodieException("Error Table Commit failed!");
+            case LOG_ERROR:
+              LOG.error("Error Table write failed for instant " + instantTime);

Review Comment:
   can we log any other context here. whats the error or exception stacktrace or some info. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -51,10 +62,56 @@ public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
 
+  private Option<BaseErrorTableWriter> errorTableWriterInterface = Option.empty();

Review Comment:
   lets ensure we have consistent naming. 
   `errorTableWriter`



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -494,9 +505,38 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
 
+      transformed = formatAdapter.processErrorEvents(transformed,
+          ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        Option<BaseErrorTableWriter> schemaValidationErrorWriter =

Review Comment:
   errorTableWriterForSchemaValidation



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {

Review Comment:
   java docs



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;
+  T payload;
+
+  public ErrorEvent(T payload, ErrorReason reason) {
+    this.payload = payload;
+    this.reason = reason;
+  }
+
+  public T getPayload() {
+    return payload;
+  }
+
+  public ErrorReason getReason() {
+    return reason;
+  }
+
+  public enum ErrorReason {

Review Comment:
   java docs



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;
+  T payload;
+
+  public ErrorEvent(T payload, ErrorReason reason) {
+    this.payload = payload;
+    this.reason = reason;
+  }
+
+  public T getPayload() {
+    return payload;
+  }
+
+  public ErrorReason getReason() {
+    return reason;
+  }
+
+  public enum ErrorReason {
+    JSON_AVRO_DESERIALIZATION_FAILURE,
+    JSON_ROW_DESERIALIZATION_FAILURE,
+    AVRO_DESERIALIZATION_FAILURE,

Review Comment:
   we should try and add docs for each entry here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is enabled, "
+          + "all the records with processing error in DeltaStreamer are transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error table write fails. "
+          + "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));

Review Comment:
   may be not required in this patch. but lets see if it makes sense to add another config for the following scenario
   if error table is enabled, but while processing the records to error table, there is some failure (uncaught) for eg, how do we wanna proceed. I guess as per this patch regular writes may fail I don't see any try, finally block. but wondering if we should give a config knob to user to either proceed w/o failing regular writes or fail the regular writes to data table as well.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] nsivabalan commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "nsivabalan (via GitHub)" <gi...@apache.org>.
nsivabalan commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118380323


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +765,26 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));

Review Comment:
   I don't think we need to address this right away, but lets think if it makes sense.
   for error records from WriteStatusRdd, should we also route them to error table. I vaguely remember a user from the community asking, if hudi can ignore such error records and commit rest of the valid data long time back.
   and since we are adding support for error table, may be we should consider routing failed records to the error table as well. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1447660104

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15466",
       "triggerID" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "triggerType" : "PUSH"
     }, {
       "hash" : "252c184b2499599f428ee499231ebfd945628493",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "252c184b2499599f428ee499231ebfd945628493",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * ab98a8996106108a6afad74de71c69f229cd550d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442) 
   * 242735ddb5c277194fc7a6ea42b8dd0ca9bbe594 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15466) 
   * 252c184b2499599f428ee499231ebfd945628493 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1447665516

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15466",
       "triggerID" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "triggerType" : "PUSH"
     }, {
       "hash" : "252c184b2499599f428ee499231ebfd945628493",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15471",
       "triggerID" : "252c184b2499599f428ee499231ebfd945628493",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * 242735ddb5c277194fc7a6ea42b8dd0ca9bbe594 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15466) 
   * 252c184b2499599f428ee499231ebfd945628493 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15471) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118696059


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to
+ * BaseErrorTableWriter. The implementation of BaseErrorTableWriter processes
+ * these error events through addErrorEvents API and commits them to the error table when
+ * upsertAndCommit API is called.
+ *
+ * The writer can use the configs defined in HoodieErrorTableConfig to manage the error table.
+ */
+public abstract class BaseErrorTableWriter<T extends ErrorEvent> {
+
+  // The column name passed to Spark for option `columnNameOfCorruptRecord`. The record
+  // is set to this column in case of an error
+  public static String ERROR_TABLE_CURRUPT_RECORD_COL_NAME = "_corrupt_record";
+
+  public BaseErrorTableWriter(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+                                   TypedProperties props, JavaSparkContext jssc, FileSystem fs) {
+  }
+
+  /**
+   * Processes input error events. These error events would be committed later through upsertAndCommit
+   * API call.
+   *
+   * @param errorEvent Input error event RDD
+   */
+  public abstract void addErrorEvents(JavaRDD<T> errorEvent);
+
+  /**
+   * Fetches the error events RDD processed by the writer so far. This is a test API.
+   */
+  @VisibleForTesting
+  public abstract Option<JavaRDD<HoodieAvroRecord>> getErrorEvents(String baseTableInstantTime, Option<String> commitedInstantTime);
+
+  /**
+   * This API is called to commit the error events (failed Hoodie Records) processed by the writer so far.
+   * These records are committed to a error table.
+   */
+  public abstract boolean upsertAndCommit(String baseTableInstantTime, Option<String> commitedInstantTime);

Review Comment:
   Created https://issues.apache.org/jira/browse/HUDI-5858 for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1434757595

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5cad5f60143509cf837f8b01467b660fefa8f77 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246) 
   * 30f2fc136656bbe0567dd7c0cf15916e0cc06af4 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1439736288

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * f705e62c81684f03d025c614a180673bc16da74b Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314) 
   * 667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1111812531


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -236,6 +239,8 @@ public class DeltaSync implements Serializable, Closeable {
    */
   private transient SparkRDDWriteClient writeClient;
 
+  private Option<BaseQuarantineTableWriter> quarantineTableWriterInterfaceImpl = Option.empty();
+

Review Comment:
   Addressed in latest commit.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -50,10 +61,55 @@ public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
 
+  private Option<BaseQuarantineTableWriter> quarantineTableWriterInterface = Option.empty();
+
   public SourceFormatAdapter(Source source) {
     this.source = source;
   }
 
+  public SourceFormatAdapter(Source source, Option<BaseQuarantineTableWriter> quarantineTableWriterInterface) {

Review Comment:
   Addressed in latest commit.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +738,18 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
+      if (quarantineTableWriterInterfaceImpl.isPresent()) {
+        String qurantineTableStartInstant = quarantineTableWriterInterfaceImpl.get().startCommit();
+        quarantineTableWriterInterfaceImpl.get().addErrorEvents(getErrorEventsForWriteStatus(writeStatusRDD));
+        Option<String> commitedInstantTime = getLatestInstantWithValidCheckpointInfo(commitTimelineOpt);
+        boolean quarantineTableSuccess = quarantineTableWriterInterfaceImpl.get().upsertAndCommit(qurantineTableStartInstant, instantTime, commitedInstantTime);
+        if (!quarantineTableSuccess) {
+          LOG.info("Qurantine Table Commit " + qurantineTableStartInstant + " failed!");
+          LOG.info("Commit " + instantTime + " failed!");
+          writeClient.rollback(instantTime);

Review Comment:
   Addressed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1441647690

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "CANCELED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 016ef6eb042f6181d35095866978a9d4744c7b14 Azure: [CANCELED](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359) 
   * 8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1433035517

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5cad5f60143509cf837f8b01467b660fefa8f77 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1109675049


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseQuarantineTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class BaseQuarantineTableWriter<T extends QuarantineEvent> {

Review Comment:
   Should we rename to `BaseHoodieAuditTableWriter`, `HoodieAuditTableConfigs`, `AuditEvent`, `AuditErrorEvent` and `AuditUtils`? Basically, I am suggesting to evolve this into a more general-purpose audit table. Error, is just one type of event. Others could be to keep record of updates/deletions in this audit table.
   Wdyt? @xushiyan @lokeshj1703 @nsivabalan @harsh1231 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118703053


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -118,11 +177,24 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
-        return new InputBatch<>(
-            Option.ofNullable(
-                r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (errorTableWriterInterface.isPresent()) {
+          StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118491750


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")
+      .defaultValue(false)
+      .withDocumentation("Config to enable error table. If the config is enabled, "
+          + "all the records with processing error in DeltaStreamer are transferred to error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_BASE_PATH = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.base.path")
+      .noDefaultValue()
+      .withDocumentation("Base path for error table under which all error records "
+          + "would be stored.");
+
+  public static final ConfigProperty<String> ERROR_TARGET_TABLE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.target.table.name")
+      .noDefaultValue()
+      .withDocumentation("Table name to be used for the error table");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_UPSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.upsert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set upsert shuffle parallelism. The config is similar to "
+          + "hoodie.upsert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<Integer> ERROR_TABLE_INSERT_PARALLELISM_VALUE = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.insert.shuffle.parallelism")
+      .defaultValue(200)
+      .withDocumentation("Config to set insert shuffle parallelism. The config is similar to "
+          + "hoodie.insert.shuffle.parallelism config but applies to the error table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_CLASS = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.class")
+      .noDefaultValue()
+      .withDocumentation("Class which handles the error table writes. This config is used to configure "
+          + "a custom implementation for Error Table Writer. Specify the full class name of the custom "
+          + "error table writer as a value for this config");
+
+  public static final ConfigProperty<Boolean> ERROR_ENABLE_VALIDATE_TARGET_SCHEMA = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.validate.targetschema.enable")
+      .defaultValue(false)
+      .withDocumentation("Records with schema mismatch with Target Schema are sent to Error Table.");
+
+  public static final ConfigProperty<String> ERROR_TABLE_WRITE_FAILURE_STRATEGY = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.write.failure.strategy")
+      .defaultValue(ErrorWriteFailureStrategy.ROLLBACK_COMMIT.name())
+      .withDocumentation("The config specifies the failure strategy if error table write fails. "
+          + "Use one of - " + Arrays.toString(ErrorWriteFailureStrategy.values()));
+
+  public enum ErrorWriteFailureStrategy {
+    ROLLBACK_COMMIT("Rollback the corresponding base table write commit for which the error events were triggered"),

Review Comment:
   Discussed offline, we can keep it as ROLLBACK_COMMIT since the strategy comes into place when write succeeds but error table write fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118696343


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -494,9 +505,38 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc
       Option<Dataset<Row>> transformed =
           dataAndCheckpoint.getBatch().map(data -> transformer.get().apply(jssc, sparkSession, data, props));
 
+      transformed = formatAdapter.processErrorEvents(transformed,
+          ErrorEvent.ErrorReason.CUSTOM_TRANSFORMER_FAILURE);
+
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && this.userProvidedSchemaProvider.getTargetSchema() != null) {
+        // If the target schema is specified through Avro schema,
+        // pass in the schema for the Row-to-Avro conversion
+        // to avoid nullability mismatch between Avro schema and Row schema
+        Option<BaseErrorTableWriter> schemaValidationErrorWriter =
+            (errorTableWriter.isPresent()
+                && props.getBoolean(HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.key(),
+                HoodieErrorTableConfig.ERROR_ENABLE_VALIDATE_TARGET_SCHEMA.defaultValue()))
+                ? errorTableWriter : Option.empty();
+        avroRDDOptional = transformed
+            .map(row ->
+                schemaValidationErrorWriter
+                    .map(impl -> {
+                      Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs = HoodieSparkUtils.safeCreateRDD(row,
+                          HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,
+                          Option.of(this.userProvidedSchemaProvider.getTargetSchema())
+                      );
+                      impl.addErrorEvents(safeCreateRDDs._2().toJavaRDD()
+                          .map(evStr -> new ErrorEvent<>(evStr,
+                              ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
+                      return safeCreateRDDs._1();
+                    })
+                    .orElseGet(() -> HoodieSparkUtils.createRdd(row,
+                      HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, reconcileSchema,

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118697753


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {

Review Comment:
   Addressed.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/ErrorEvent.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+public class ErrorEvent<T> {
+
+  ErrorReason reason;

Review Comment:
   Addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118694506


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieErrorTableConfig.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Arrays;
+
+@Immutable
+@ConfigClassProperty(name = "Error table Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations that are required for Error table configs")
+public class HoodieErrorTableConfig {
+  public static final ConfigProperty<Boolean> ERROR_TABLE_ENABLED = ConfigProperty
+      .key("hoodie.deltastreamer.errortable.enable")

Review Comment:
   Created https://issues.apache.org/jira/browse/HUDI-5858 for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1111810522


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -117,11 +174,23 @@ public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptS
       case JSON: {
         InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>) source).fetchNext(lastCkptStr, sourceLimit);
         Schema sourceSchema = r.getSchemaProvider().getSourceSchema();
-        StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
-        return new InputBatch<>(
-            Option.ofNullable(
-                r.getBatch().map(rdd -> source.getSparkSession().read().schema(dataType).json(rdd)).orElse(null)),
-            r.getCheckpointForNextBatch(), r.getSchemaProvider());
+        if (quarantineTableWriterInterface.isPresent()) {
+          StructType dataType = AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
+              .add(new StructField(QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME, DataTypes.StringType, true, Metadata.empty()));
+          Option<Dataset<Row>> dataset = r.getBatch().map(rdd -> source.getSparkSession().read()
+              .option("mode", "PERMISSIVE").option("columnNameOfCorruptRecord", QUARANTINE_TABLE_CURRUPT_RECORD_COL_NAME).schema(dataType)

Review Comment:
   It is PERMISSIVE by default. https://spark.apache.org/docs/latest/sql-data-sources-json.html



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+
+public abstract class QuarantineEvent<T> {
+
+  QuarantineReason reason;
+  T payload;
+
+  public abstract DataType getPayloadType();
+
+  public abstract String getPayload();
+
+  public QuarantineReason getReason() {
+    return reason;
+  }
+
+  public abstract GenericRecord getAvroPayload();
+
+  public enum QuarantineReason {
+    JSON_AVRO_DESERIALIZATION_FAILURE,

Review Comment:
   Sounds good. What would be the use case of error code? We could leverage ordinal() function to get the code as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1114062796


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala:
##########
@@ -116,10 +116,74 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport {
     }, SQLConf.get)
   }
 
-  def getCatalystRowSerDe(structType: StructType) : SparkRowSerDe = {
-    sparkAdapter.createSparkRowSerDe(structType)
-  }
-
   private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
     new SQLConfInjectingRDD(rdd, conf)
+
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
+                    latestTableSchema: org.apache.hudi.common.util.Option[Schema] = org.apache.hudi.common.util.Option.empty()):
+  Tuple2[RDD[GenericRecord], RDD[String]] = {
+    var latestTableSchemaConverted: Option[Schema] = None
+
+    if (latestTableSchema.isPresent && reconcileToLatestSchema) {
+      latestTableSchemaConverted = Some(latestTableSchema.get())
+    } else {
+      // cases when users want to use latestTableSchema but have not turned on reconcileToLatestSchema explicitly
+      // for example, when using a Transformer implementation to transform source RDD to target RDD
+      latestTableSchemaConverted = if (latestTableSchema.isPresent) Some(latestTableSchema.get()) else None
+    }
+    safeCreateRDD(df, structName, recordNamespace, latestTableSchemaConverted);
+  }
+
+  def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, readerAvroSchemaOpt: Option[Schema]):
+  Tuple2[RDD[GenericRecord], RDD[String]] = {
+    val writerSchema = df.schema
+    val writerAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName, recordNamespace)
+    val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+    // We check whether passed in reader schema is identical to writer schema to avoid costly serde loop of
+    // making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
+    // (and back)
+    val sameSchema = writerAvroSchema.equals(readerAvroSchema)
+    val (nullable, _) = AvroConversionUtils.resolveAvroTypeNullability(writerAvroSchema)
+
+    // NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
+    //       serializer is not able to digest it
+    val writerAvroSchemaStr = writerAvroSchema.toString
+    // NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to [[Row]] conversion
+
+    if (!sameSchema) {
+      val rdds: RDD[Either[GenericRecord, String]] = df.queryExecution.toRdd.mapPartitions { rows =>
+        if (rows.isEmpty) {
+          Iterator.empty
+        } else {
+          val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
+          val convert = AvroConversionUtils.createInternalRowToAvroConverter(writerSchema, writerAvroSchema, nullable = nullable)
+          val rowDeserializer = getCatalystRowSerDe(writerSchema)
+          val transform: InternalRow => Either[GenericRecord, String] = internalRow => try {
+            Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow), readerAvroSchema, true))
+          } catch {
+            case _: Throwable =>
+              val rdd = df.sparkSession.sparkContext.parallelize(Seq(rowDeserializer.deserializeRow(internalRow)))
+              Right(df.sqlContext.createDataFrame(rdd, writerSchema).toJSON.first())

Review Comment:
   Is this the right way to create a json from a Row? Row provides json() function but that is available from version 3.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1114434016


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+
+public abstract class QuarantineEvent<T> {
+
+  QuarantineReason reason;
+  T payload;
+
+  public abstract DataType getPayloadType();
+
+  public abstract String getPayload();
+
+  public QuarantineReason getReason() {
+    return reason;
+  }
+
+  public abstract GenericRecord getAvroPayload();
+
+  public enum QuarantineReason {
+    JSON_AVRO_DESERIALIZATION_FAILURE,

Review Comment:
   Right, and users may want to standardize on the error codes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1441516480

   Created https://issues.apache.org/jira/browse/HUDI-5836 for follow up tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1441861389

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * 8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6 Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1433046242

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5cad5f60143509cf837f8b01467b660fefa8f77 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] codope commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "codope (via GitHub)" <gi...@apache.org>.
codope commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1109708633


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineJsonEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DataTypes;
+
+public class QuarantineJsonEvent extends QuarantineEvent<String> {

Review Comment:
   Why have `Json` in the name? Add Javadocs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1436864362

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30f2fc136656bbe0567dd7c0cf15916e0cc06af4 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278) 
   * 1b159da06fc8f6521ecfb5e047955e0ba3156730 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1434769327

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a5cad5f60143509cf837f8b01467b660fefa8f77 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246) 
   * 30f2fc136656bbe0567dd7c0cf15916e0cc06af4 Azure: [PENDING](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1435445116

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 30f2fc136656bbe0567dd7c0cf15916e0cc06af4 Azure: [FAILURE](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278) 
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Quarantine table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1111811803


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+
+public abstract class QuarantineEvent<T> {
+
+  QuarantineReason reason;
+  T payload;
+
+  public abstract DataType getPayloadType();
+
+  public abstract String getPayload();
+
+  public QuarantineReason getReason() {
+    return reason;
+  }
+
+  public abstract GenericRecord getAvroPayload();

Review Comment:
   Addressed in latest commit.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/QuarantineEvent.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.sql.types.DataType;
+
+public abstract class QuarantineEvent<T> {
+
+  QuarantineReason reason;
+  T payload;
+
+  public abstract DataType getPayloadType();

Review Comment:
   Addressed in latest commit.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -100,7 +155,9 @@ public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
       case ROW:
-        return ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        InputBatch<Dataset<Row>> datasetInputBatch = ((Source<Dataset<Row>>) source).fetchNext(lastCkptStr, sourceLimit);
+        return new InputBatch<>(transformDatasetWithQuarantineEvents(datasetInputBatch.getBatch()),

Review Comment:
   Renamed to 'processQuarantineEvents'. Please let me know if it doesn't sound good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118694815


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BaseErrorTableWriter.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.VisibleForTesting;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * The class which handles error events when delta streamer syncs data from source. All the
+ * records which Delta streamer is not able to process are triggered as error events to

Review Comment:
   Addressed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118695585


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java:
##########
@@ -51,10 +62,56 @@ public final class SourceFormatAdapter implements Closeable {
 
   private final Source source;
 
+  private Option<BaseErrorTableWriter> errorTableWriterInterface = Option.empty();

Review Comment:
   Addressed in latest commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] lokeshj1703 commented on a diff in pull request #7982: [HUDI-5813] Error table support for error events

Posted by "lokeshj1703 (via GitHub)" <gi...@apache.org>.
lokeshj1703 commented on code in PR #7982:
URL: https://github.com/apache/hudi/pull/7982#discussion_r1118703439


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -710,6 +765,26 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> writeToSink(JavaRDD<HoodieRec
             + totalErrorRecords + "/" + totalRecords);
       }
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));

Review Comment:
   Created https://issues.apache.org/jira/browse/HUDI-5858 for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [hudi] hudi-bot commented on pull request #7982: [HUDI-5813] Error table support for error events

Posted by "hudi-bot (via GitHub)" <gi...@apache.org>.
hudi-bot commented on PR #7982:
URL: https://github.com/apache/hudi/pull/7982#issuecomment-1447575115

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15246",
       "triggerID" : "a5cad5f60143509cf837f8b01467b660fefa8f77",
       "triggerType" : "PUSH"
     }, {
       "hash" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15278",
       "triggerID" : "30f2fc136656bbe0567dd7c0cf15916e0cc06af4",
       "triggerType" : "PUSH"
     }, {
       "hash" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15296",
       "triggerID" : "1b159da06fc8f6521ecfb5e047955e0ba3156730",
       "triggerType" : "PUSH"
     }, {
       "hash" : "f705e62c81684f03d025c614a180673bc16da74b",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15314",
       "triggerID" : "f705e62c81684f03d025c614a180673bc16da74b",
       "triggerType" : "PUSH"
     }, {
       "hash" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15338",
       "triggerID" : "667b1ef4eb7d9e344119d63f75bdcb8ccd1664b7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15339",
       "triggerID" : "b6ef7c09ed66456fa285d6136360ed35087c3de7",
       "triggerType" : "PUSH"
     }, {
       "hash" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "5e157e5bd199cc6e4828f00cb4b81d095eee3e5a",
       "triggerType" : "PUSH"
     }, {
       "hash" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15359",
       "triggerID" : "016ef6eb042f6181d35095866978a9d4744c7b14",
       "triggerType" : "PUSH"
     }, {
       "hash" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15361",
       "triggerID" : "8ae41d105c88a5e21ac4692ace7f4b2cd4c834b6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6",
       "triggerType" : "PUSH"
     }, {
       "hash" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442",
       "triggerID" : "ab98a8996106108a6afad74de71c69f229cd550d",
       "triggerType" : "PUSH"
     }, {
       "hash" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "242735ddb5c277194fc7a6ea42b8dd0ca9bbe594",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 5e157e5bd199cc6e4828f00cb4b81d095eee3e5a UNKNOWN
   * dfadbc7d9c1b499d518a6af3d8b78cf169e3a0c6 UNKNOWN
   * ab98a8996106108a6afad74de71c69f229cd550d Azure: [SUCCESS](https://dev.azure.com/apache-hudi-ci-org/785b6ef4-2f42-4a89-8f0e-5f0d7039a0cc/_build/results?buildId=15442) 
   * 242735ddb5c277194fc7a6ea42b8dd0ca9bbe594 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     @hudi-bot supports the following commands:
   
    - `@hudi-bot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org