You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/06/18 20:37:07 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1201] Add
datset.urn in GTE for MRCompactionTask
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 24372a3 [GOBBLIN-1201] Add datset.urn in GTE for MRCompactionTask
24372a3 is described below
commit 24372a308006bf0b3933f026bd3c452317e30761
Author: Lei Sun <au...@gmail.com>
AuthorDate: Thu Jun 18 13:36:51 2020 -0700
[GOBBLIN-1201] Add datset.urn in GTE for MRCompactionTask
Add datset.urn in GTE for MRCompactionTask
Revert the change of default datset.urn
Closes #3048 from autumnust/populatedatasetUrn
---
.../apache/gobblin/compaction/source/CompactionSource.java | 13 ++++++++-----
.../compaction/mapreduce/AvroCompactionTaskTest.java | 2 +-
2 files changed, 9 insertions(+), 6 deletions(-)
diff --git a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
index 35fe53d..ae3b124 100644
--- a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
+++ b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/source/CompactionSource.java
@@ -151,7 +151,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
this.workUnitIterator = workUnitIterator;
}
- public void run () {
+ public void run() {
try {
Stopwatch stopwatch = Stopwatch.createStarted();
int threads = this.state.getPropAsInt(CompactionVerifier.COMPACTION_VERIFICATION_THREADS, 5);
@@ -205,7 +205,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
for (Dataset dataset: datasets) {
log.info ("{} is timed out and give up the verification, adding a failed task", dataset.datasetURN());
// create failed task for these failed datasets
- this.workUnitIterator.addWorkUnit (createWorkUnitForFailure(dataset, failedReasonMap.get(dataset.getUrn())));
+ this.workUnitIterator.addWorkUnit(createWorkUnitForFailure(dataset, failedReasonMap.get(dataset.getUrn())));
}
}
@@ -316,11 +316,11 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
* {@link VerifiedDataset} wraps original {@link Dataset} because if verification failed, we are able get original
* datasets and restart the entire process of verification against those failed datasets.
*/
- public VerifiedDataset call () throws DatasetVerificationException {
+ public VerifiedDataset call() throws DatasetVerificationException {
try {
VerifiedResult result = this.verify(dataset);
if (result.allVerificationPassed) {
- this.workUnitIterator.addWorkUnit (createWorkUnit(dataset));
+ this.workUnitIterator.addWorkUnit(createWorkUnit(dataset));
}
return new VerifiedDataset(dataset, result);
} catch (Exception e) {
@@ -422,16 +422,18 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
}
}
- protected WorkUnit createWorkUnit (Dataset dataset) throws IOException {
+ protected WorkUnit createWorkUnit(Dataset dataset) throws IOException {
WorkUnit workUnit = new WorkUnit();
TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
suite.save(dataset, workUnit);
+ workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
return workUnit;
}
protected WorkUnit createWorkUnitForFailure (Dataset dataset) throws IOException {
WorkUnit workUnit = new FailedTask.FailedWorkUnit();
TaskUtils.setTaskFactoryClass(workUnit, CompactionFailedTask.CompactionFailedTaskFactory.class);
+ workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
suite.save(dataset, workUnit);
return workUnit;
}
@@ -440,6 +442,7 @@ public class CompactionSource implements WorkUnitStreamSource<String, String> {
WorkUnit workUnit = new FailedTask.FailedWorkUnit();
workUnit.setProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON, reason);
TaskUtils.setTaskFactoryClass(workUnit, CompactionFailedTask.CompactionFailedTaskFactory.class);
+ workUnit.setProp(ConfigurationKeys.DATASET_URN_KEY, dataset.getUrn());
suite.save(dataset, workUnit);
return workUnit;
}
diff --git a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
index 048fe47..fb53a5c 100644
--- a/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
+++ b/gobblin-compaction/src/test/java/org/apache/gobblin/compaction/mapreduce/AvroCompactionTaskTest.java
@@ -266,7 +266,7 @@ public class AvroCompactionTaskTest {
}
@Test
- public void testWorkUnitStream () throws Exception {
+ public void testWorkUnitStream() throws Exception {
File basePath = Files.createTempDir();
basePath.deleteOnExit();
GenericRecord r1 = createRandomRecord();