You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/06/04 21:20:08 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1174] Fail job on FileBasedSource ls invalid source directory

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 576fa19  [GOBBLIN-1174] Fail job on FileBasedSource ls invalid source directory
576fa19 is described below

commit 576fa19f560642b6cc11d41ad4e192d3fb70ead9
Author: zhchen <zh...@linkedin.com>
AuthorDate: Thu Jun 4 14:20:00 2020 -0700

    [GOBBLIN-1174] Fail job on FileBasedSource ls invalid source directory
    
    Closes #3019 from zxcware/fshelper
---
 .../source/extractor/filebased/FileBasedSource.java     |  8 +++++---
 .../gobblin/source/extractor/hadoop/AvroFileSource.java | 17 +++++++++--------
 .../source/extractor/filebased/FileBasedSourceTest.java | 11 +++++++++++
 3 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 941935c..973a7b0 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -264,7 +264,7 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
    * directory
    */
   public List<String> getcurrentFsSnapshot(State state) {
-    List<String> results = new ArrayList<>();
+    List<String> results;
     String path = getLsPattern(state);
 
     try {
@@ -280,8 +280,10 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
         results.set(i, filePath + this.splitPattern + this.fsHelper.getFileMTime(filePath));
       }
     } catch (FileBasedHelperException | URISyntaxException e) {
-      log.error("Not able to fetch the filename/file modified time to " + e.getMessage() + " will not pull any files",
-          e);
+      String errMsg = String.format(
+          "Not able to fetch the filename/file modified time to %s. Will not pull any files", e.getMessage());
+      log.error(errMsg, e);
+      throw new RuntimeException(errMsg, e);
     }
     return results;
   }
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java
index f189b47..00f9131 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java
@@ -17,26 +17,24 @@
 
 package org.apache.gobblin.source.extractor.hadoop;
 
-import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
 import java.io.IOException;
 import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
 import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
 
 
+@Slf4j
 public class AvroFileSource extends FileBasedSource<Schema, GenericRecord> {
-  private static final Logger LOGGER = LoggerFactory.getLogger(AvroFileSource.class);
 
   @Override
   public Extractor<Schema, GenericRecord> getExtractor(WorkUnitState state) throws IOException {
@@ -51,14 +49,17 @@ public class AvroFileSource extends FileBasedSource<Schema, GenericRecord> {
 
   @Override
   public List<String> getcurrentFsSnapshot(State state) {
-    List<String> results = Lists.newArrayList();
+    List<String> results;
     String path = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY);
 
     try {
-      LOGGER.info("Running ls command with input " + path);
+      log.info("Running ls command with input " + path);
       results = this.fsHelper.ls(path);
     } catch (FileBasedHelperException e) {
-      LOGGER.error("Not able to run ls command due to " + e.getMessage() + " will not pull any files", e);
+      String errMsg = String.format(
+          "Not able to run ls command due to %s. Will not pull any files", e.getMessage());
+      log.error(errMsg, e);
+      throw new RuntimeException(errMsg, e);
     }
     return results;
   }
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index 15d9616..af752b8 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -104,6 +104,17 @@ public class FileBasedSourceTest {
     Assert.assertEquals(3, workUnits.size());
   }
 
+  @Test(expectedExceptions = RuntimeException.class)
+  public void testFailOnInvalidSourceDirectory() {
+    SourceState sourceState = new SourceState();
+    sourceState.setBroker(jobBroker);
+    AvroFileSource source = new AvroFileSource();
+    initState(sourceState);
+    Path path = new Path(sourceDir, "testFailOnInvalidSourceDirectory");
+    sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, path.toString());
+    source.getWorkunits(sourceState);
+  }
+
   @Test
   public void testSourceLineage() {
     String dataset = Path.getPathWithoutSchemeAndAuthority(sourceDir).toString();