You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/06/18 20:37:07 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1201] Add datset.urn in GTE for MRCompactionTask

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 24372a3  [GOBBLIN-1201] Add datset.urn in GTE for MRCompactionTask
24372a3 is described below

commit 24372a308006bf0b3933f026bd3c452317e30761
Author: Lei Sun <au...@gmail.com>
AuthorDate: Thu Jun 18 13:36:51 2020 -0700

    [GOBBLIN-1201] Add datset.urn in GTE for MRCompactionTask
    
    Add datset.urn in GTE for MRCompactionTask
    
    Revert the change of default datset.urn
    
    Closes #3048 from autumnust/populatedatasetUrn
---
 .../apache/gobblin/compaction/source/CompactionSource.java  | 13 ++++++++-----
 .../compaction/mapreduce/AvroCompactionTaskTest.java        |  2 +-
 2 files changed, 9 insertions(+), 6 deletions(-)

diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 35fe53d..ae3b124 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -151,7 +151,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
       this.workUnitIterator = workUnitIterator;
     }
 
-    public void run () {
+    public void run() {
       try {
         Stopwatch stopwatch = Stopwatch.createStarted();
         int threads = this.state.getPropAsInt(CompactionVerifier.COMPACTION_VERIFICATION_THREADS, 5);
@@ -205,7 +205,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
           for (Dataset dataset: datasets) {
             log.info ("{} is timed out and give up the verification, adding a failed task", dataset.datasetURN());
             // create failed task for these failed datasets
-            this.workUnitIterator.addWorkUnit (createWorkUnitForFailure(dataset, failedReasonMap.get(dataset.getUrn())));
+            this.workUnitIterator.addWorkUnit(createWorkUnitForFailure(dataset, failedReasonMap.get(dataset.getUrn())));
           }
         }
 
@@ -316,11 +316,11 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
      * {@link VerifiedDataset} wraps original {@link Dataset} because if verification failed, we are able get original
      * datasets and restart the entire process of verification against those failed datasets.
      */
-    public VerifiedDataset call () throws DatasetVerificationException {
+    public VerifiedDataset call() throws DatasetVerificationException {
       try {
         VerifiedResult result = this.verify(dataset);
         if (result.allVerificationPassed) {
-          this.workUnitIterator.addWorkUnit (createWorkUnit(dataset));
+          this.workUnitIterator.addWorkUnit(createWorkUnit(dataset));
         }
         return new VerifiedDataset(dataset, result);
       } catch (Exception e) {
@@ -422,16 +422,18 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
     }
   }
 
-  protected WorkUnit createWorkUnit (Dataset dataset) throws IOException {
+  protected WorkUnit createWorkUnit(Dataset dataset) throws IOException {
     WorkUnit workUnit = new WorkUnit();
     TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
     suite.save(dataset, workUnit);
+    workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
     return workUnit;
   }
 
   protected WorkUnit createWorkUnitForFailure (Dataset dataset) throws IOException {
     WorkUnit workUnit = new FailedTask.FailedWorkUnit();
     TaskUtils.setTaskFactoryClass(workUnit, CompactionFailedTask.CompactionFailedTaskFactory.class);
+    workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
     suite.save(dataset, workUnit);
     return workUnit;
   }
@@ -440,6 +442,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
     WorkUnit workUnit = new FailedTask.FailedWorkUnit();
     workUnit.setProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON, reason);
     TaskUtils.setTaskFactoryClass(workUnit, CompactionFailedTask.CompactionFailedTaskFactory.class);
+    workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
     suite.save(dataset, workUnit);
     return workUnit;
   }
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 048fe47..fb53a5c 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -266,7 +266,7 @@ public class AvroCompactionTaskTest {
   }
 
   @Test
-   public void testWorkUnitStream () throws Exception {
+   public void testWorkUnitStream() throws Exception {
      File basePath = Files.createTempDir();
      basePath.deleteOnExit();
      GenericRecord r1 = createRandomRecord();