You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/10 19:55:52 UTC

incubator-gobblin git commit: [GOBBLIN-364] Exclude JobState from WorkUnit created by PartitionedFileSourceBase

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master fbf7c9bbd -> df75b13e1


[GOBBLIN-364] Exclude JobState from WorkUnit created by PartitionedFileSourceBase

Closes #2237 from zxcware/file


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/df75b13e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/df75b13e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/df75b13e

Branch: refs/heads/master
Commit: df75b13e1f7ee5776ed18b7aade9014fae8deeea
Parents: fbf7c9b
Author: zhchen <zh...@linkedin.com>
Authored: Wed Jan 10 11:55:45 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jan 10 11:55:45 2018 -0800

----------------------------------------------------------------------
 .../source/PartitionedFileSourceBase.java       | 24 ++++++-------
 .../extractor/filebased/FileBasedSource.java    | 17 +++++----
 .../DatePartitionedAvroFileExtractorTest.java   | 36 ++++++++++++++++++++
 3 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index d317e54..9ec7707 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -249,15 +249,14 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
 
   private Extract getExtractForFile(PartitionAwareFileRetriever.FileInfo file,
       String topicName,
-      SourceState partitionState,
+      String namespace,
       Map<Long, Extract> extractMap) {
     Extract extract = extractMap.get(file.getWatermarkMsSinceEpoch());
 
     if (extract == null) {
       // Create an extract object for the dayPath
 
-      extract = partitionState
-          .createExtract(this.tableType, partitionState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY), topicName);
+      extract = new Extract(this.tableType, namespace, topicName);
 
       LOG.info("Created extract: " + extract.getExtractId() + " for path " + topicName);
       extractMap.put(file.getWatermarkMsSinceEpoch(), extract);
@@ -277,23 +276,20 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
           retriever.getFilesToProcess(this.lowWaterMark, this.maxFilesPerJob - this.fileCount);
       Collections.sort(filesToPull);
       String topicName = this.sourceDir.getName();
+      String namespace = this.sourceState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
 
-      SourceState partitionState = new SourceState();
-
-      partitionState.addAll(this.sourceState);
-      partitionState.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName);
       Map<Long, Extract> extractMap = new HashMap<>();
       for (PartitionAwareFileRetriever.FileInfo file : filesToPull) {
-        Extract extract = getExtractForFile(file, topicName, partitionState, extractMap);
+        Extract extract = getExtractForFile(file, topicName, namespace, extractMap);
 
         LOG.info("Will process file " + file.getFilePath());
 
-        partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, file.getFilePath());
-        partitionState.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
-        partitionState.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
-        partitionState.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch());
-
-        WorkUnit singleWorkUnit = partitionState.createWorkUnit(extract);
+        WorkUnit singleWorkUnit = WorkUnit.create(extract);
+        singleWorkUnit.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName);
+        singleWorkUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, file.getFilePath());
+        singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
+        singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
+        singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch());
 
         multiWorkUnitWeightedQueue.addWorkUnit(singleWorkUnit, file.getFileSize());
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index d693f44..46a0de0 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -168,28 +168,27 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
 
       // Distribute the files across the workunits
       for (int fileOffset = 0; fileOffset < filesToPull.size(); fileOffset += filesPerPartition) {
-        SourceState partitionState = new SourceState();
-        partitionState.addAll(state);
+        // Use extract table name to create extract
+        Extract extract = new Extract(tableType, nameSpaceName, extractTableName);
+        WorkUnit workUnit = WorkUnit.create(extract);
 
         // Eventually these setters should be integrated with framework support for generalized watermark handling
-        partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT,
+        workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT,
             StringUtils.join(effectiveSnapshot, ","));
 
         List<String> partitionFilesToPull = filesToPull.subList(fileOffset,
             fileOffset + filesPerPartition > filesToPull.size() ? filesToPull.size() : fileOffset + filesPerPartition);
-        partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+        workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
             StringUtils.join(partitionFilesToPull, ","));
         if (state.getPropAsBoolean(ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, false)) {
           if (partitionFilesToPull.size() != 1) {
             throw new RuntimeException("Cannot preserve the file name if a workunit is given multiple files");
           }
-          partitionState.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
-              partitionState.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
+          workUnit.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
+              workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
         }
 
-        // Use extract table name to create extract
-        Extract extract = partitionState.createExtract(tableType, nameSpaceName, extractTableName);
-        workUnits.add(partitionState.createWorkUnit(extract));
+        workUnits.add(workUnit);
       }
 
       log.info("Total number of work units for the current run: " + (workUnits.size() - previousWorkUnitsForRetry.size()));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
index 73b0d06..299ab9a 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
@@ -151,6 +151,42 @@ public class DatePartitionedAvroFileExtractorTest {
   }
 
   @Test
+  public void testJobStateNotCopiedToWorkUnit() {
+
+    DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();
+
+    SourceState state = new SourceState();
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
+    state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, SOURCE_ENTITY);
+    state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, OUTPUT_DIR + Path.SEPARATOR + SOURCE_ENTITY);
+    state.setProp(ConfigurationKeys.SOURCE_ENTITY, SOURCE_ENTITY);
+    state.setProp(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, 2);
+
+    state.setProp("date.partitioned.source.partition.pattern", DATE_PATTERN);
+    state.setProp("date.partitioned.source.min.watermark.value", DateTimeFormat.forPattern(DATE_PATTERN).print(
+        this.startDateTime.minusMinutes(1)));
+    state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, TableType.SNAPSHOT_ONLY);
+    state.setProp("date.partitioned.source.partition.prefix", PREFIX);
+    state.setProp("date.partitioned.source.partition.suffix", SUFFIX);
+
+    String dummyKey = "dummy.job.config";
+    state.setProp(dummyKey, "dummy");
+
+    List<WorkUnit> workunits = source.getWorkunits(state);
+
+    Assert.assertEquals(workunits.size(), 4);
+    for(WorkUnit wu : workunits) {
+      if (wu instanceof MultiWorkUnit) {
+        for (WorkUnit workUnit : ((MultiWorkUnit) wu).getWorkUnits()) {
+          Assert.assertFalse(workUnit.contains(dummyKey));
+        }
+      } else {
+        Assert.assertFalse(wu.contains(dummyKey));
+      }
+    }
+  }
+
+  @Test
   public void testReadPartitionsByMinute() throws IOException, DataRecordException {
 
     DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();