You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/10 19:55:52 UTC
incubator-gobblin git commit: [GOBBLIN-364] Exclude JobState from
WorkUnit created by PartitionedFileSourceBase
Repository: incubator-gobblin
Updated Branches:
refs/heads/master fbf7c9bbd -> df75b13e1
[GOBBLIN-364] Exclude JobState from WorkUnit created by PartitionedFileSourceBase
Closes #2237 from zxcware/file
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/df75b13e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/df75b13e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/df75b13e
Branch: refs/heads/master
Commit: df75b13e1f7ee5776ed18b7aade9014fae8deeea
Parents: fbf7c9b
Author: zhchen <zh...@linkedin.com>
Authored: Wed Jan 10 11:55:45 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Jan 10 11:55:45 2018 -0800
----------------------------------------------------------------------
.../source/PartitionedFileSourceBase.java | 24 ++++++-------
.../extractor/filebased/FileBasedSource.java | 17 +++++----
.../DatePartitionedAvroFileExtractorTest.java | 36 ++++++++++++++++++++
3 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index d317e54..9ec7707 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -249,15 +249,14 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
private Extract getExtractForFile(PartitionAwareFileRetriever.FileInfo file,
String topicName,
- SourceState partitionState,
+ String namespace,
Map<Long, Extract> extractMap) {
Extract extract = extractMap.get(file.getWatermarkMsSinceEpoch());
if (extract == null) {
// Create an extract object for the dayPath
- extract = partitionState
- .createExtract(this.tableType, partitionState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY), topicName);
+ extract = new Extract(this.tableType, namespace, topicName);
LOG.info("Created extract: " + extract.getExtractId() + " for path " + topicName);
extractMap.put(file.getWatermarkMsSinceEpoch(), extract);
@@ -277,23 +276,20 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
retriever.getFilesToProcess(this.lowWaterMark, this.maxFilesPerJob - this.fileCount);
Collections.sort(filesToPull);
String topicName = this.sourceDir.getName();
+ String namespace = this.sourceState.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY);
- SourceState partitionState = new SourceState();
-
- partitionState.addAll(this.sourceState);
- partitionState.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName);
Map<Long, Extract> extractMap = new HashMap<>();
for (PartitionAwareFileRetriever.FileInfo file : filesToPull) {
- Extract extract = getExtractForFile(file, topicName, partitionState, extractMap);
+ Extract extract = getExtractForFile(file, topicName, namespace, extractMap);
LOG.info("Will process file " + file.getFilePath());
- partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, file.getFilePath());
- partitionState.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
- partitionState.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
- partitionState.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch());
-
- WorkUnit singleWorkUnit = partitionState.createWorkUnit(extract);
+ WorkUnit singleWorkUnit = WorkUnit.create(extract);
+ singleWorkUnit.setProp(ConfigurationKeys.SOURCE_ENTITY, topicName);
+ singleWorkUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, file.getFilePath());
+ singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
+ singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
+ singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch());
multiWorkUnitWeightedQueue.addWorkUnit(singleWorkUnit, file.getFileSize());
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index d693f44..46a0de0 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -168,28 +168,27 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
// Distribute the files across the workunits
for (int fileOffset = 0; fileOffset < filesToPull.size(); fileOffset += filesPerPartition) {
- SourceState partitionState = new SourceState();
- partitionState.addAll(state);
+ // Use extract table name to create extract
+ Extract extract = new Extract(tableType, nameSpaceName, extractTableName);
+ WorkUnit workUnit = WorkUnit.create(extract);
// Eventually these setters should be integrated with framework support for generalized watermark handling
- partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT,
+ workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT,
StringUtils.join(effectiveSnapshot, ","));
List<String> partitionFilesToPull = filesToPull.subList(fileOffset,
fileOffset + filesPerPartition > filesToPull.size() ? filesToPull.size() : fileOffset + filesPerPartition);
- partitionState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+ workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
StringUtils.join(partitionFilesToPull, ","));
if (state.getPropAsBoolean(ConfigurationKeys.SOURCE_FILEBASED_PRESERVE_FILE_NAME, false)) {
if (partitionFilesToPull.size() != 1) {
throw new RuntimeException("Cannot preserve the file name if a workunit is given multiple files");
}
- partitionState.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
- partitionState.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
+ workUnit.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR,
+ workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL));
}
- // Use extract table name to create extract
- Extract extract = partitionState.createExtract(tableType, nameSpaceName, extractTableName);
- workUnits.add(partitionState.createWorkUnit(extract));
+ workUnits.add(workUnit);
}
log.info("Total number of work units for the current run: " + (workUnits.size() - previousWorkUnitsForRetry.size()));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/df75b13e/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
index 73b0d06..299ab9a 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/DatePartitionedAvroFileExtractorTest.java
@@ -151,6 +151,42 @@ public class DatePartitionedAvroFileExtractorTest {
}
@Test
+ public void testJobStateNotCopiedToWorkUnit() {
+
+ DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();
+
+ SourceState state = new SourceState();
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, ConfigurationKeys.LOCAL_FS_URI);
+ state.setProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, SOURCE_ENTITY);
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, OUTPUT_DIR + Path.SEPARATOR + SOURCE_ENTITY);
+ state.setProp(ConfigurationKeys.SOURCE_ENTITY, SOURCE_ENTITY);
+ state.setProp(ConfigurationKeys.SOURCE_MAX_NUMBER_OF_PARTITIONS, 2);
+
+ state.setProp("date.partitioned.source.partition.pattern", DATE_PATTERN);
+ state.setProp("date.partitioned.source.min.watermark.value", DateTimeFormat.forPattern(DATE_PATTERN).print(
+ this.startDateTime.minusMinutes(1)));
+ state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, TableType.SNAPSHOT_ONLY);
+ state.setProp("date.partitioned.source.partition.prefix", PREFIX);
+ state.setProp("date.partitioned.source.partition.suffix", SUFFIX);
+
+ String dummyKey = "dummy.job.config";
+ state.setProp(dummyKey, "dummy");
+
+ List<WorkUnit> workunits = source.getWorkunits(state);
+
+ Assert.assertEquals(workunits.size(), 4);
+ for(WorkUnit wu : workunits) {
+ if (wu instanceof MultiWorkUnit) {
+ for (WorkUnit workUnit : ((MultiWorkUnit) wu).getWorkUnits()) {
+ Assert.assertFalse(workUnit.contains(dummyKey));
+ }
+ } else {
+ Assert.assertFalse(wu.contains(dummyKey));
+ }
+ }
+ }
+
+ @Test
public void testReadPartitionsByMinute() throws IOException, DataRecordException {
DatePartitionedAvroFileSource source = new DatePartitionedAvroFileSource();