You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:31:03 UTC
[45/50] incubator-gobblin git commit: [GOBBLIN-422] Update fs
snapshot in previously failed workunits with the current effectiveSnapshot
[GOBBLIN-422] Update fs snapshot in previously failed workunits with the current effectiveSnapshot
Closes #2299 from ragepati/ragepati-
filebasedsource-prevfssnapshot
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/979ad2a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/979ad2a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/979ad2a0
Branch: refs/heads/0.12.0
Commit: 979ad2a090600495ad3b47de462d39b2f2ab33ea
Parents: 8636b0c
Author: Raul Agepati <ra...@users.noreply.github.com>
Authored: Mon Mar 5 15:05:10 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Mar 5 15:05:10 2018 -0800
----------------------------------------------------------------------
.../gobblin/source/extractor/filebased/FileBasedSource.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/979ad2a0/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 46a0de0..e34a28f 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -152,6 +152,11 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
// file is not pulled this run
}
}
+ // Update the snapshot from the previous run with the new files processed in this run
+ // Otherwise a corrupt file could cause re-processing of already processed files
+ for (WorkUnit workUnit : previousWorkUnitsForRetry) {
+ workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT, StringUtils.join(effectiveSnapshot, ","));
+ }
if (!filesToPull.isEmpty()) {
logFilesToPull(filesToPull);