You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2020/06/04 21:20:08 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1174] Fail job
on FileBasedSource ls invalid source directory
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 576fa19 [GOBBLIN-1174] Fail job on FileBasedSource ls invalid source directory
576fa19 is described below
commit 576fa19f560642b6cc11d41ad4e192d3fb70ead9
Author: zhchen <zh...@linkedin.com>
AuthorDate: Thu Jun 4 14:20:00 2020 -0700
[GOBBLIN-1174] Fail job on FileBasedSource ls invalid source directory
Closes #3019 from zxcware/fshelper
---
.../source/extractor/filebased/FileBasedSource.java | 8 +++++---
.../gobblin/source/extractor/hadoop/AvroFileSource.java | 17 +++++++++--------
.../source/extractor/filebased/FileBasedSourceTest.java | 11 +++++++++++
3 files changed, 25 insertions(+), 11 deletions(-)
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index 941935c..973a7b0 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -264,7 +264,7 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
* directory
*/
public List<String> getcurrentFsSnapshot(State state) {
- List<String> results = new ArrayList<>();
+ List<String> results;
String path = getLsPattern(state);
try {
@@ -280,8 +280,10 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
results.set(i, filePath + this.splitPattern + this.fsHelper.getFileMTime(filePath));
}
} catch (FileBasedHelperException | URISyntaxException e) {
- log.error("Not able to fetch the filename/file modified time to " + e.getMessage() + " will not pull any files",
- e);
+ String errMsg = String.format(
+ "Not able to fetch the filename/file modified time to %s. Will not pull any files", e.getMessage());
+ log.error(errMsg, e);
+ throw new RuntimeException(errMsg, e);
}
return results;
}
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java
index f189b47..00f9131 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/hadoop/AvroFileSource.java
@@ -17,26 +17,24 @@
package org.apache.gobblin.source.extractor.hadoop;
-import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
+@Slf4j
public class AvroFileSource extends FileBasedSource<Schema, GenericRecord> {
- private static final Logger LOGGER = LoggerFactory.getLogger(AvroFileSource.class);
@Override
public Extractor<Schema, GenericRecord> getExtractor(WorkUnitState state) throws IOException {
@@ -51,14 +49,17 @@ public class AvroFileSource extends FileBasedSource<Schema, GenericRecord> {
@Override
public List<String> getcurrentFsSnapshot(State state) {
- List<String> results = Lists.newArrayList();
+ List<String> results;
String path = state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY);
try {
- LOGGER.info("Running ls command with input " + path);
+ log.info("Running ls command with input " + path);
results = this.fsHelper.ls(path);
} catch (FileBasedHelperException e) {
- LOGGER.error("Not able to run ls command due to " + e.getMessage() + " will not pull any files", e);
+ String errMsg = String.format(
+ "Not able to run ls command due to %s. Will not pull any files", e.getMessage());
+ log.error(errMsg, e);
+ throw new RuntimeException(errMsg, e);
}
return results;
}
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index 15d9616..af752b8 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -104,6 +104,17 @@ public class FileBasedSourceTest {
Assert.assertEquals(3, workUnits.size());
}
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testFailOnInvalidSourceDirectory() {
+ SourceState sourceState = new SourceState();
+ sourceState.setBroker(jobBroker);
+ AvroFileSource source = new AvroFileSource();
+ initState(sourceState);
+ Path path = new Path(sourceDir, "testFailOnInvalidSourceDirectory");
+ sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY, path.toString());
+ source.getWorkunits(sourceState);
+ }
+
@Test
public void testSourceLineage() {
String dataset = Path.getPathWithoutSchemeAndAuthority(sourceDir).toString();