You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:31:03 UTC

[45/50] incubator-gobblin git commit: [GOBBLIN-422] Update fs snapshot in previously failed workunits with the current effectiveSnapshot

[GOBBLIN-422] Update fs snapshot in previously failed workunits with the current effectiveSnapshot

Closes #2299 from ragepati/ragepati-
filebasedsource-prevfssnapshot


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/979ad2a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/979ad2a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/979ad2a0

Branch: refs/heads/0.12.0
Commit: 979ad2a090600495ad3b47de462d39b2f2ab33ea
Parents: 8636b0c
Author: Raul Agepati <ra...@users.noreply.github.com>
Authored: Mon Mar 5 15:05:10 2018 -0800
Committer: Abhishek Tiwari <ab...@gmail.com>
Committed: Mon Mar 5 15:05:10 2018 -0800

----------------------------------------------------------------------
 .../gobblin/source/extractor/filebased/FileBasedSource.java     | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/979ad2a0/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 46a0de0..e34a28f 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -152,6 +152,11 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
         // file is not pulled this run
       }
     }
+    // Update the snapshot from the previous run with the new files processed in this run
+    // Otherwise a corrupt file could cause re-processing of already processed files
+    for (WorkUnit workUnit : previousWorkUnitsForRetry) {
+      workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_SNAPSHOT, StringUtils.join(effectiveSnapshot, ","));
+    }
 
     if (!filesToPull.isEmpty()) {
       logFilesToPull(filesToPull);