You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/11/04 01:19:15 UTC

[GitHub] [gobblin] ZihanLi58 commented on a change in pull request #3419: [GOBBLIN-1565]Make GMCEWriter fault tolerant so that one topic failure will not affect other topics in the same container

ZihanLi58 commented on a change in pull request #3419:
URL: https://github.com/apache/gobblin/pull/3419#discussion_r742452232



##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more than %s dataset", maxErrorDataset));

Review comment:
       I print out the exception also we will emit GTE for those exception. But will update the exception to contain the last exception we met

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more than %s dataset", maxErrorDataset));
+    }
+    tableStatus.gmceLowWatermark = tableStatus.gmceHighWatermark;
+  }
+
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void flush(String dbName, String tableName) throws IOException {
+    boolean meetException = false;
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    if (tableOperationTypeMap.get(tableString).gmceLowWatermark == tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+      // No need to flush
+      return;
+    }
+    for (MetadataWriter writer : metadataWriters) {
+      if(meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.flush(dbName, tableName);
+        } catch (IOException e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+    if (!meetException && datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetName)

Review comment:
       We want to emit GTE only when the actual watermark moves. it's either in the flush() method where watermark in state store move or we successfully commit one snapshot so that the high watermark in the iceberg table property moves. That's why the condition here is meetException is false as this means we successfully commit once for this table after we met exception. Another place we will emit GTE is at the end of the flush() (not flush(db,table)) method. 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);

Review comment:
       Sorry for that confusing, I was using datasetName to be consistent with the metadata schema where they treat path as datasetName. I will change datasetName to datasetPath to avoid this confusion. It's not a 1:1 mapping, as one dataset can be register into multi tables

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more than %s dataset", maxErrorDataset));

Review comment:
       I print out the exception also we will emit GTE for those exception. But will update the exception to contain the last exception we met

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);
+    log.error(String.format("Meet exception when flush table %s", tableString), e);
+    if (datasetErrorMap.size() > maxErrorDataset) {
+      //Fail the job if the error size exceeds some number
+      throw new IOException(String.format("Container fails to flush for more than %s dataset", maxErrorDataset));
+    }
+    tableStatus.gmceLowWatermark = tableStatus.gmceHighWatermark;
+  }
+
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void flush(String dbName, String tableName) throws IOException {
+    boolean meetException = false;
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    if (tableOperationTypeMap.get(tableString).gmceLowWatermark == tableOperationTypeMap.get(tableString).gmceHighWatermark) {
+      // No need to flush
+      return;
+    }
+    for (MetadataWriter writer : metadataWriters) {
+      if(meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.flush(dbName, tableName);
+        } catch (IOException e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+    if (!meetException && datasetErrorMap.containsKey(tableOperationTypeMap.get(tableString).datasetName)

Review comment:
       We want to emit GTE only when the actual watermark moves. it's either in the flush() method where watermark in state store move or we successfully commit one snapshot so that the high watermark in the iceberg table property moves. That's why the condition here is meetException is false as this means we successfully commit once for this table after we met exception. Another place we will emit GTE is at the end of the flush() (not flush(db,table)) method. 

##########
File path: gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/GobblinMCEWriter.java
##########
@@ -248,20 +272,88 @@ public void writeEnvelope(RecordEnvelope<GenericRecord> recordEnvelope) throws I
       String dbName = spec.getTable().getDbName();
       String tableName = spec.getTable().getTableName();
       String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
-      if (tableOperationTypeMap.containsKey(tableString)
-          && tableOperationTypeMap.get(tableString) != gmce.getOperationType()) {
-        for (MetadataWriter writer : metadataWriters) {
-          writer.flush(dbName, tableName);
-        }
-      }
-      tableOperationTypeMap.put(tableString, gmce.getOperationType());
-      for (MetadataWriter writer : metadataWriters) {
-        writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+      if (!tableOperationTypeMap.containsKey(tableString)) {
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
+      } else if (tableOperationTypeMap.get(tableString).operationType != gmce.getOperationType()) {
+        flush(dbName, tableName);
+        tableOperationTypeMap.put(tableString, new TableStatus(gmce.getOperationType(),
+            gmce.getDatasetIdentifier().getNativeName(), watermark.getTopicPartition().getTopicName(), watermark.getTopicPartition().getId(),
+            watermark.getLwm().getValue()-1, watermark.getLwm().getValue()));
       }
+      tableOperationTypeMap.get(tableString).gmceHighWatermark = watermark.getLwm().getValue();
+      write(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
     }
     this.recordCount.incrementAndGet();
   }
 
+  // Add fault tolerant ability and make sure we can emit GTE as desired
+  private void write(RecordEnvelope recordEnvelope, ConcurrentHashMap newSpecsMap, ConcurrentHashMap oldSpecsMap, HiveSpec spec) throws IOException {
+    boolean meetException = false;
+    String dbName = spec.getTable().getDbName();
+    String tableName = spec.getTable().getTableName();
+    String tableString = Joiner.on(TABLE_NAME_DELIMITER).join(dbName, tableName);
+    for (MetadataWriter writer : metadataWriters) {
+      if (meetException) {
+        writer.reset(dbName, tableName);
+      } else {
+        try {
+          writer.writeEnvelope(recordEnvelope, newSpecsMap, oldSpecsMap, spec);
+        } catch (Exception e) {
+          meetException = true;
+          addException(e, tableString, dbName, tableName);
+        }
+      }
+    }
+  }
+  private void addException(Exception e, String tableString, String dbName, String tableName) throws IOException{
+    TableStatus tableStatus = tableOperationTypeMap.get(tableString);
+    Map<String, Queue<MetadataFlushException>> tableErrorMap = this.datasetErrorMap.getOrDefault(tableStatus.datasetName, new HashMap<>());
+    Queue<MetadataFlushException> errors = tableErrorMap.getOrDefault(tableString, new LinkedList<>());
+    if (!errors.isEmpty() && errors.peek().highWatermark == tableStatus.gmceLowWatermark) {
+      errors.peek().highWatermark = tableStatus.gmceHighWatermark;
+    } else {
+      MetadataFlushException metadataFlushException =
+          new MetadataFlushException(tableStatus.datasetName, dbName, tableName, tableStatus.gmceTopicName, tableStatus.gmceTopicPartition, tableStatus.gmceLowWatermark, tableStatus.gmceHighWatermark, e);
+      errors.offer(metadataFlushException);
+    }
+    tableErrorMap.put(tableString, errors);
+    this.datasetErrorMap.put(tableStatus.datasetName, tableErrorMap);

Review comment:
       Sorry for that confusing, I was using datasetName to be consistent with the metadata schema where they treat path as datasetName. I will change datasetName to datasetPath to avoid this confusion. It's not a 1:1 mapping, as one dataset can be register into multi tables




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@gobblin.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org