You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/13 17:47:40 UTC

[incubator-hudi] branch master updated: Fix various errors found by long running delta-streamer tests 1. Parquet Avro schema mismatch errors when ingesting are sometimes silently ignored due to race-condition in BoundedInMemoryExecutor. This was reproducible when running long-running delta-streamer with wrong schema and it caused data-loss 2. Fix behavior of Delta-Streamer to error out by default if there are any error records 3. Fix a bug in tracking write errors in WriteStats. Earlier the write errors were tracking sampl [...]

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cce9ab  Fix various errors found by long running delta-streamer tests   1. Parquet Avro schema mismatch errors when ingesting are sometimes silently ignored due to race-condition in BoundedInMemoryExecutor. This was reproducible when running long-running delta-streamer with wrong schema and it caused data-loss   2. Fix behavior of Delta-Streamer to error out by default if there are any error records   3. Fix a bug in tracking write errors in WriteStats. Earlier the write errors  [...]
9cce9ab is described below

commit 9cce9abf4de340c095760431301d369e364c5b34
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Sun May 12 11:57:04 2019 -0700

    Fix various errors found by long running delta-streamer tests
      1. Parquet Avro schema mismatch errors when ingesting are sometimes silently ignored due to race-condition in BoundedInMemoryExecutor. This was reproducible when running long-running delta-streamer with wrong schema and it caused data-loss
      2. Fix behavior of Delta-Streamer to error out by default if there are any error records
      3. Fix a bug in tracking write errors in WriteStats. Earlier the write errors were tracking sampled errors as opposed to total errors.
      4. Delta Streamer does not commit the changes done as part of inline compaction as auto-commit is force disabled. Fix this behavior to always auto-commit inline compaction as it would not otherwise commit.
---
 .../java/com/uber/hoodie/HoodieWriteClient.java    |  3 +-
 .../src/main/java/com/uber/hoodie/WriteStatus.java |  3 +-
 .../com/uber/hoodie/io/HoodieAppendHandle.java     |  2 +-
 .../com/uber/hoodie/io/HoodieCreateHandle.java     |  2 +-
 .../java/com/uber/hoodie/io/HoodieMergeHandle.java |  2 +-
 .../common/util/queue/BoundedInMemoryExecutor.java |  2 +-
 .../common/util/queue/BoundedInMemoryQueue.java    |  3 ++
 .../deltastreamer/HoodieDeltaStreamer.java         | 55 +++++++++++++++-------
 8 files changed, 49 insertions(+), 23 deletions(-)

diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index 632c13d..ff46794 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -1348,7 +1348,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
     Optional<String> compactionInstantTimeOpt = scheduleCompaction(extraMetadata);
     compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
       try {
-        compact(compactionInstantTime);
+        // inline compaction should auto commit as the user is never given control
+        compact(compactionInstantTime, true);
       } catch (IOException ioe) {
         throw new HoodieIOException(ioe.getMessage(), ioe);
       }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java
index 54b32f0..98c7e78 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/WriteStatus.java
@@ -89,7 +89,8 @@ public class WriteStatus implements Serializable {
    * HoodieRecord} before deflation.
    */
   public void markFailure(HoodieRecord record, Throwable t, Optional<Map<String, String>> optionalRecordMetadata) {
-    if (random.nextDouble() <= failureFraction) {
+    if (failedRecords.isEmpty() || (random.nextDouble() <= failureFraction)) {
+      // Guaranteed to have at-least one error
       failedRecords.add(record);
       errors.put(record.getKey(), t);
     }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index 7cd2267..17f4d26 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -251,7 +251,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
       writeStatus.getStat().setNumInserts(insertRecordsWritten);
       writeStatus.getStat().setNumDeletes(recordsDeleted);
       writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
-      writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
+      writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords());
       RuntimeStats runtimeStats = new RuntimeStats();
       runtimeStats.setTotalUpsertTime(timer.endTimer());
       writeStatus.getStat().setRuntimeStats(runtimeStats);
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index 5ff3561..0a22865 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -166,7 +166,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
       stat.setFileId(writeStatus.getFileId());
       stat.setPaths(new Path(config.getBasePath()), path, tempPath);
       stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
-      stat.setTotalWriteErrors(writeStatus.getFailedRecords().size());
+      stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
       RuntimeStats runtimeStats = new RuntimeStats();
       runtimeStats.setTotalCreateTime(timer.endTimer());
       stat.setRuntimeStats(runtimeStats);
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index 3beb97d..4c3bfa8 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -281,7 +281,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
       writeStatus.getStat().setNumDeletes(recordsDeleted);
       writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
       writeStatus.getStat().setNumInserts(insertRecordsWritten);
-      writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
+      writeStatus.getStat().setTotalWriteErrors(writeStatus.getTotalErrorRecords());
       RuntimeStats runtimeStats = new RuntimeStats();
       runtimeStats.setTotalUpsertTime(timer.endTimer());
       writeStatus.getStat().setRuntimeStats(runtimeStats);
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java
index add9200..5765ea9 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryExecutor.java
@@ -93,7 +93,7 @@ public class BoundedInMemoryExecutor<I, O, E> {
           preExecute();
           producer.produce(queue);
         } catch (Exception e) {
-          logger.error("error consuming records", e);
+          logger.error("error producing records", e);
           queue.markAsFailed(e);
           throw e;
         } finally {
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java
index 401924e..3792233 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/queue/BoundedInMemoryQueue.java
@@ -207,6 +207,9 @@ public class BoundedInMemoryQueue<I, O> implements Iterable<O> {
         throw new HoodieException(e);
       }
     }
+    // Check one more time here as it is possible producer errored out and closed immediately
+    throwExceptionIfFailed();
+
     if (newRecord != null && newRecord.isPresent()) {
       return newRecord;
     } else {
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index 68621c4..e6e8417 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -276,24 +276,42 @@ public class HoodieDeltaStreamer implements Serializable {
       throw new HoodieDeltaStreamerException("Unknown operation :" + cfg.operation);
     }
 
-    // Simply commit for now. TODO(vc): Support better error handlers later on
-    HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
-    checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
-
-    boolean success = client.commit(commitTime, writeStatusRDD,
-        Optional.of(checkpointCommitMetadata));
-    if (success) {
-      log.info("Commit " + commitTime + " successful!");
-      // TODO(vc): Kick off hive sync from here.
+    long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalErrorRecords()).sum().longValue();
+    long totalRecords = writeStatusRDD.mapToDouble(ws -> ws.getTotalRecords()).sum().longValue();
+    boolean hasErrors = totalErrorRecords > 0;
+    long hiveSyncTimeMs = 0;
+    if (!hasErrors || cfg.commitOnErrors) {
+      HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
+      checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
+
+      if (hasErrors) {
+        log.warn("Some records failed to be merged but forcing commit since commitOnErrors set. Errors/Total="
+            + totalErrorRecords + "/" + totalRecords);
+      }
+
+      boolean success = client.commit(commitTime, writeStatusRDD,
+          Optional.of(checkpointCommitMetadata));
+      if (success) {
+        log.info("Commit " + commitTime + " successful!");
+        // Sync to hive if enabled
+        Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
+        syncHive();
+        hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
+      } else {
+        log.info("Commit " + commitTime + " failed!");
+      }
     } else {
-      log.info("Commit " + commitTime + " failed!");
+      log.error("There are errors when ingesting records. Errors/Total="
+          + totalErrorRecords + "/" + totalRecords);
+      log.error("Printing out the top 100 errors");
+      writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
+        log.error("Global error :", ws.getGlobalError());
+        if (ws.getErrors().size() > 0) {
+          ws.getErrors().entrySet().forEach(r ->
+              log.trace("Error for key:" + r.getKey() + " is " + r.getValue()));
+        }
+      });
     }
-
-    // Sync to hive if enabled
-    Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
-    syncHive();
-    long hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
-
     client.close();
     long overallTimeMs = overallTimerContext != null ? overallTimerContext.stop() : 0;
 
@@ -347,7 +365,7 @@ public class HoodieDeltaStreamer implements Serializable {
     UPSERT, INSERT, BULK_INSERT
   }
 
-  private class OperationConvertor implements IStringConverter<Operation> {
+  private static class OperationConvertor implements IStringConverter<Operation> {
     @Override
     public Operation convert(String value) throws ParameterException {
       return Operation.valueOf(value);
@@ -432,6 +450,9 @@ public class HoodieDeltaStreamer implements Serializable {
     @Parameter(names = {"--spark-master"}, description = "spark master to use.")
     public String sparkMaster = "local[2]";
 
+    @Parameter(names = {"--commit-on-errors"})
+    public Boolean commitOnErrors = false;
+
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
   }