You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:45 UTC

[27/50] incubator-gobblin git commit: [GOBBLIN-396] add date partitioned based json source

[GOBBLIN-396] add date partitioned based json source

Closes #2270 from
arjun4084346/jsonDatePartitionedSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c678d9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c678d9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c678d9b

Branch: refs/heads/0.12.0
Commit: 5c678d9b6b008c9fef0eeea731f2bf19e55e1cea
Parents: 457ede2
Author: Arjun <ab...@linkedin.com>
Authored: Thu Feb 8 11:40:19 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Feb 8 11:40:19 2018 -0800

----------------------------------------------------------------------
 .../configuration/ConfigurationKeys.java        |   6 +
 .../JsonRecordAvroSchemaToAvroConverter.java    |  11 +-
 .../source/DatePartitionedJsonFileSource.java   |  39 ++++++
 .../source/DatePartitionedNestedRetriever.java  |   9 +-
 .../source/PartitionedFileSourceBase.java       |  20 +++-
 .../source/RegexBasedPartitionedRetriever.java  |   9 +-
 .../DatePartitionedJsonFileExtractor.java       |  30 +++++
 .../source/extractor/SimpleJsonExtractor.java   | 118 +++++++++++++++++++
 ...JsonRecordAvroSchemaToAvroConverterTest.java |   5 +-
 .../filebased/FileBasedSourceTest.java          |  23 ++++
 .../test/resources/source/2017-12/metadata.json |   1 +
 .../resources/source/2017-12/simplejson.json    |   3 +
 .../test/resources/source/2018-01/metadata.json |   1 +
 .../resources/source/2018-01/simplejson.json    |   3 +
 .../resources/source/2018-01/simplejson2.json   |   3 +
 15 files changed, 268 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 267a17e..d07d740 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -302,6 +302,8 @@ public class ConfigurationKeys {
   public static final String CONVERTER_AVRO_FIELD_PICK_FIELDS = "converter.avro.fields";
   public static final String CONVERTER_AVRO_JDBC_ENTRY_FIELDS_PAIRS = "converter.avro.jdbc.entry_fields_pairs";
   public static final String CONVERTER_SKIP_FAILED_RECORD = "converter.skipFailedRecord";
+  public static final String CONVERTER_AVRO_SCHEMA_KEY = "converter.avroSchema";
+  public static final String CONVERTER_IGNORE_FIELDS = "converter.ignoreFields";
 
   /**
    * Fork operator configuration properties.
@@ -452,6 +454,10 @@ public class ConfigurationKeys {
    * Configuration properties used by the extractor.
    */
   public static final String SOURCE_ENTITY = "source.entity";
+  public static final String SCHEMA_IN_SOURCE_DIR = "schema.in.source.dir";
+  public static final boolean DEFAULT_SCHEMA_IN_SOURCE_DIR = false;
+  public static final String SCHEMA_FILENAME = "schema.filename";
+  public static final String DEFAULT_SCHEMA_FILENAME = "metadata.json";
 
   // Comma-separated source entity names
   public static final String SOURCE_ENTITIES = "source.entities";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index 11f85f4..8e25975 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -22,6 +22,7 @@ import java.util.List;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.converter.DataConversionException;
@@ -42,18 +43,12 @@ import com.google.common.base.Splitter;
 public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase<SI, JsonObject> {
 
   private static final Splitter SPLITTER_ON_COMMA = Splitter.on(',').trimResults().omitEmptyStrings();
-
-  public static final String AVRO_SCHEMA_KEY = "converter.avroSchema";
-  public static final String IGNORE_FIELDS = "converter.ignoreFields";
-
   private Schema schema;
   private List<String> ignoreFields;
 
   public ToAvroConverterBase<SI, JsonObject> init(WorkUnitState workUnit) {
     super.init(workUnit);
-    Preconditions.checkArgument(workUnit.contains(AVRO_SCHEMA_KEY));
-    this.schema = new Schema.Parser().parse(workUnit.getProp(AVRO_SCHEMA_KEY));
-    this.ignoreFields = SPLITTER_ON_COMMA.splitToList(workUnit.getProp(IGNORE_FIELDS, ""));
+    this.ignoreFields = SPLITTER_ON_COMMA.splitToList(workUnit.getProp(ConfigurationKeys.CONVERTER_IGNORE_FIELDS, ""));
     return this;
   }
 
@@ -62,6 +57,8 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
    */
   @Override
   public Schema convertSchema(SI inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+    Preconditions.checkArgument(workUnit.contains(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY));
+    this.schema = new Schema.Parser().parse(workUnit.getProp(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY));
     return this.schema;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java
new file mode 100644
index 0000000..cc5167a
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DatePartitionedJsonFileExtractor;
+import org.apache.gobblin.source.extractor.Extractor;
+
+import com.google.gson.JsonObject;
+
+public class DatePartitionedJsonFileSource extends PartitionedFileSourceBase<String, JsonObject> {
+
+  public DatePartitionedJsonFileSource() {
+    super(new DatePartitionedNestedRetriever(".json"));
+  }
+
+  @Override
+  public Extractor<String, JsonObject> getExtractor(WorkUnitState state)
+      throws IOException {
+    return new DatePartitionedJsonFileExtractor(state);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
index a9ff257..4c33555 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
@@ -72,6 +72,8 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
   private HadoopFsHelper helper;
   private final String expectedExtension;
   private Duration leadTimeDuration;
+  private boolean schemaInSourceDir;
+  private String schemaFile;
 
   public DatePartitionedNestedRetriever(String expectedExtension) {
     this.expectedExtension = expectedExtension;
@@ -91,6 +93,10 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
     this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
     this.leadTimeDuration = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state);
     this.helper = new HadoopFsHelper(state);
+    this.schemaInSourceDir = state.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
+        ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR);
+    this.schemaFile = this.schemaInSourceDir ? state.getProp(ConfigurationKeys.SCHEMA_FILENAME,
+        ConfigurationKeys.DEFAULT_SCHEMA_FILENAME) : "";
   }
 
   @Override
@@ -201,7 +207,8 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
     return new PathFilter() {
       @Override
       public boolean accept(Path path) {
-        return path.getName().endsWith(extension);
+        return path.getName().endsWith(extension) &&
+            !(schemaInSourceDir && path.getName().equals(schemaFile)) ;
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index 9ec7707..1b54895 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -69,7 +69,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
   public static final String DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX =
       DATE_PARTITIONED_SOURCE_PREFIX + ".partition.suffix";
 
-  static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN =
+  public static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN =
       DATE_PARTITIONED_SOURCE_PREFIX + ".partition.pattern";
 
   public static final String DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY =
@@ -99,7 +99,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
   * If this parameter is not specified the job will start reading data from
   * the beginning of Unix time.
   */
-  private static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE =
+  public static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE =
       DATE_PARTITIONED_SOURCE_PREFIX + ".min.watermark.value";
 
   /**
@@ -291,6 +291,11 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
         singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
         singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch());
 
+        if (this.sourceState.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
+            ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR)) {
+          addSchemaFile(file, singleWorkUnit);
+        }
+
         multiWorkUnitWeightedQueue.addWorkUnit(singleWorkUnit, file.getFileSize());
 
         this.fileCount++;
@@ -302,6 +307,17 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
     }
   }
 
+  private void addSchemaFile(PartitionAwareFileRetriever.FileInfo dataFile, WorkUnit workUnit)
+      throws IOException {
+    Path schemaFile = new Path(new Path(dataFile.getFilePath()).getParent(),
+        workUnit.getProp(ConfigurationKeys.SCHEMA_FILENAME, ConfigurationKeys.DEFAULT_SCHEMA_FILENAME));
+    if (fs.exists(schemaFile)) {
+      workUnit.setProp(ConfigurationKeys.SOURCE_SCHEMA, schemaFile.toString());
+    } else {
+      throw new IOException("Schema file " + schemaFile + " does not exist.");
+    }
+  }
+
   /**
    * Gets the LWM for this job runs. The new LWM is the HWM of the previous run + 1 unit (day,hour,minute..etc).
    * If there was no previous execution then it is set to the given lowWaterMark + 1 unit.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
index e082bda..7d3ad92 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
@@ -48,6 +48,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
   private Path sourceDir;
   private final String expectedExtension;
   private Duration leadTime;
+  private boolean schemaInSourceDir;
+  private String schemaFile;
 
   public RegexBasedPartitionedRetriever(String expectedExtension) {
     this.expectedExtension = expectedExtension;
@@ -64,6 +66,10 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
     this.pattern = Pattern.compile(regexPattern);
     this.helper = new HadoopFsHelper(state);
     this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+    this.schemaInSourceDir = state.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
+        ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR);
+    this.schemaFile = this.schemaInSourceDir ? state.getProp(ConfigurationKeys.SCHEMA_FILENAME,
+        ConfigurationKeys.DEFAULT_SCHEMA_FILENAME) : "";
   }
 
   @Override
@@ -175,7 +181,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
     return new PathFilter() {
       @Override
       public boolean accept(Path path) {
-        return path.getName().endsWith(extension);
+        return path.getName().endsWith(extension) &&
+            !(schemaInSourceDir && path.getName().equals(schemaFile)) ;
       }
     };
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java
new file mode 100644
index 0000000..b165a35
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+public class DatePartitionedJsonFileExtractor extends SimpleJsonExtractor {
+
+  public DatePartitionedJsonFileExtractor(WorkUnitState workUnitState)
+      throws IOException {
+    super(workUnitState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java
new file mode 100644
index 0000000..412a06e
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Closer;
+import com.google.gson.JsonObject;
+import com.google.gson.Gson;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+/**
+ * An implementation of {@link Extractor}.
+ *
+ * <p>
+ *   This extractor reads the assigned input file storing
+ *   json documents confirming to a schema. Each line of the file is a json document.
+ * </p>
+ */
+public class SimpleJsonExtractor implements Extractor<String, JsonObject> {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleJsonExtractor.class);
+  private final WorkUnitState workUnitState;
+  private final FileSystem fs;
+  private final BufferedReader bufferedReader;
+  private final Closer closer = Closer.create();
+  private static final Gson GSON = new Gson();
+
+  public SimpleJsonExtractor(WorkUnitState workUnitState) throws IOException {
+    this.workUnitState = workUnitState;
+
+    HadoopFsHelper fsHelper = new HadoopFsHelper(workUnitState);
+    try {
+      fsHelper.connect();
+    } catch (Exception e) {
+      throw new IOException("Exception at SimpleJsonExtractor");
+    }
+    // Source is responsible to set SOURCE_FILEBASED_FILES_TO_PULL
+    this.fs = fsHelper.getFileSystem();
+    InputStreamReader isr = new InputStreamReader(this.fs.open(
+        new Path(workUnitState.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL))), StandardCharsets.UTF_8);
+
+    this.bufferedReader =
+        this.closer.register(new BufferedReader(isr));
+  }
+
+  @Override
+  public String getSchema() throws IOException {
+    // Source is responsible to set SOURCE_SCHEMA
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    IOUtils.copyBytes(fs.open(
+        new Path(workUnitState.getProp(ConfigurationKeys.SOURCE_SCHEMA))), outputStream, 4096, false);
+    String schema = new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
+    workUnitState.setProp((ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY), schema);
+    return schema;
+  }
+
+  @Override
+  public JsonObject readRecord(@Deprecated JsonObject reuse) throws DataRecordException, IOException {
+    String jsonString = this.bufferedReader.readLine();
+    return GSON.fromJson(jsonString, JsonObject.class);
+  }
+
+  @Override
+  public long getExpectedRecordCount() {
+    // We don't know how many records are in the file before actually reading them
+    return 0;
+  }
+
+  @Override
+  public long getHighWatermark() {
+    // Watermark is not applicable for this type of extractor
+    return 0;
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      this.closer.close();
+    } catch (IOException ioe) {
+      LOGGER.error("Failed to close the input stream", ioe);
+    }
+
+    try {
+      fs.close();
+    } catch (IOException ioe) {
+      LOGGER.error("Failed to close the file object", ioe);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
index 9971d83..4cf6898 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.source.workunit.Extract.TableType;
@@ -52,8 +53,8 @@ public class JsonRecordAvroSchemaToAvroConverterTest {
     SourceState source = new SourceState();
     this.state = new WorkUnitState(
         source.createWorkUnit(source.createExtract(TableType.SNAPSHOT_ONLY, "test_table", "test_namespace")));
-    this.state.setProp(JsonRecordAvroSchemaToAvroConverter.AVRO_SCHEMA_KEY, avroSchemaString);
-    this.state.setProp(JsonRecordAvroSchemaToAvroConverter.IGNORE_FIELDS, "fieldToIgnore");
+    this.state.setProp(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY, avroSchemaString);
+    this.state.setProp(ConfigurationKeys.CONVERTER_IGNORE_FIELDS, "fieldToIgnore");
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index 8cceff2..95b3656 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -21,10 +21,13 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.SourceState;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.DatePartitionedJsonFileSource;
+import org.apache.gobblin.source.PartitionedFileSourceBase;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.source.workunit.Extract;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 import org.testng.collections.Lists;
@@ -32,6 +35,7 @@ import org.testng.collections.Lists;
 import java.io.IOException;
 import java.util.List;
 
+
 @Test
 public class FileBasedSourceTest {
     @Test
@@ -57,6 +61,25 @@ public class FileBasedSourceTest {
         }
     }
 
+    @Test void numberOfWorkUnits() throws IOException {
+        SourceState sourceState = new SourceState();
+        DatePartitionedJsonFileSource source = new DatePartitionedJsonFileSource();
+        initState(sourceState);
+        List<WorkUnit> workUnits = source.getWorkunits(sourceState);
+        Assert.assertEquals(3, workUnits.size());
+    }
+
+    private void initState(State state) {
+        state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY,
+            new Path(getClass().getResource("/source").toString()).toString());
+        state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN, "yyyy-MM");
+        state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE, "2017-11");
+        state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, "snapshot_only");
+        state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+        state.setProp(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, "true");
+        state.setProp(ConfigurationKeys.SCHEMA_FILENAME, "metadata.json");
+    }
+
     private static class DummyFileBasedSource extends FileBasedSource<String, String> {
         @Override
         public void initFileSystemHelper(State state) throws FileBasedHelperException {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2017-12/metadata.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2017-12/metadata.json b/gobblin-core/src/test/resources/source/2017-12/metadata.json
new file mode 100644
index 0000000..0003f63
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2017-12/metadata.json
@@ -0,0 +1 @@
+{"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number",  "type":"int"}, {"name":"favorite_color", "type":"string"}]}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2017-12/simplejson.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2017-12/simplejson.json b/gobblin-core/src/test/resources/source/2017-12/simplejson.json
new file mode 100644
index 0000000..c325df0
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2017-12/simplejson.json
@@ -0,0 +1,3 @@
+{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"}
+{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
+{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/metadata.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2018-01/metadata.json b/gobblin-core/src/test/resources/source/2018-01/metadata.json
new file mode 100644
index 0000000..0003f63
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2018-01/metadata.json
@@ -0,0 +1 @@
+{"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number",  "type":"int"}, {"name":"favorite_color", "type":"string"}]}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/simplejson.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2018-01/simplejson.json b/gobblin-core/src/test/resources/source/2018-01/simplejson.json
new file mode 100644
index 0000000..c325df0
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2018-01/simplejson.json
@@ -0,0 +1,3 @@
+{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"}
+{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
+{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2018-01/simplejson2.json b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json
new file mode 100644
index 0000000..c325df0
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json
@@ -0,0 +1,3 @@
+{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"}
+{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
+{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"}