You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2021/04/14 16:54:29 UTC

[GitHub] [gobblin] autumnust commented on a change in pull request #3255: [GOBBLIN-1419]Error handling for compaction pipeline on GMCE emitted error

autumnust commented on a change in pull request #3255:
URL: https://github.com/apache/gobblin/pull/3255#discussion_r613395422



##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
##########
@@ -81,10 +84,15 @@ public Result verify(FileSystemDataset dataset) {
         newRecords = helper.calculateRecordCount(Lists.newArrayList(new Path(dataset.datasetURN())));
       }
       double oldRecords = helper.readRecordCount(new Path(result.getDstAbsoluteDir()));
-
+      State datasetState = helper.loadState(new Path(result.getDstAbsoluteDir()));
       if (oldRecords == 0) {
         return new Result(true, "");
       }
+      if(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED, false)) {

Review comment:
       these two branches could be merged.

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/verify/CompactionThresholdVerifier.java
##########
@@ -60,7 +62,8 @@ public CompactionThresholdVerifier(State state) {
    * dataset. To avoid scalability issue, we choose a stateless approach where each dataset tracks
    * record count by themselves and persist it in the file system)
    *
-   * @return true iff the difference exceeds the threshold or this is the first time compaction
+   * @return true if the difference exceeds the threshold or this is the first time compaction or

Review comment:
       I am not sure if it is a good idea to package the logic of checking gmce-emission inside `thresholder` verifier.  What's the cost of having a separate implementation of verifier ? 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
##########
@@ -57,9 +58,11 @@
 
   public static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
   public static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
+  public final static String GMCE_EMITTED_KEY = "GMCE.emitted";
   private final State state;
   private final CompactionJobConfigurator configurator;
   private final Configuration conf;
+  private InputRecordCountHelper helper;

Review comment:
       This class may deserve a better name given the fact that it is not only concerning for recordCount anymore. 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionGMCEPublishingAction.java
##########
@@ -57,9 +58,11 @@
 
   public static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id";
   public static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required";
+  public final static String GMCE_EMITTED_KEY = "GMCE.emitted";
   private final State state;
   private final CompactionJobConfigurator configurator;
   private final Configuration conf;
+  private InputRecordCountHelper helper;

Review comment:
       Second comment for this: Considering the wide usage of `InputRecordCountHelper` among different implementation of CompactionAction, does it make sense to share an instance of that instead of instantiating the same object in different implementations? 

##########
File path: gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
##########
@@ -167,6 +167,10 @@ public void onCompactionJobComplete(FileSystemDataset dataset) throws IOExceptio
         compactionState.setProp(DUPLICATE_COUNT_TOTAL + Long.toString(executionCount),
             compactionState.getProp(DUPLICATE_COUNT_TOTAL, "null"));
       }
+      if(state.getPropAsBoolean(ConfigurationKeys.GOBBLIN_METADATA_CHANGE_EVENT_ENABLED, false)) {

Review comment:
       please consider using auto-formatting to clean the spaces between preserved keywords




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org