You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:45 UTC
[27/50] incubator-gobblin git commit: [GOBBLIN-396] add date
partitioned based json source
[GOBBLIN-396] add date partitioned based json source
Closes #2270 from
arjun4084346/jsonDatePartitionedSource
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5c678d9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5c678d9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5c678d9b
Branch: refs/heads/0.12.0
Commit: 5c678d9b6b008c9fef0eeea731f2bf19e55e1cea
Parents: 457ede2
Author: Arjun <ab...@linkedin.com>
Authored: Thu Feb 8 11:40:19 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Thu Feb 8 11:40:19 2018 -0800
----------------------------------------------------------------------
.../configuration/ConfigurationKeys.java | 6 +
.../JsonRecordAvroSchemaToAvroConverter.java | 11 +-
.../source/DatePartitionedJsonFileSource.java | 39 ++++++
.../source/DatePartitionedNestedRetriever.java | 9 +-
.../source/PartitionedFileSourceBase.java | 20 +++-
.../source/RegexBasedPartitionedRetriever.java | 9 +-
.../DatePartitionedJsonFileExtractor.java | 30 +++++
.../source/extractor/SimpleJsonExtractor.java | 118 +++++++++++++++++++
...JsonRecordAvroSchemaToAvroConverterTest.java | 5 +-
.../filebased/FileBasedSourceTest.java | 23 ++++
.../test/resources/source/2017-12/metadata.json | 1 +
.../resources/source/2017-12/simplejson.json | 3 +
.../test/resources/source/2018-01/metadata.json | 1 +
.../resources/source/2018-01/simplejson.json | 3 +
.../resources/source/2018-01/simplejson2.json | 3 +
15 files changed, 268 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 267a17e..d07d740 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -302,6 +302,8 @@ public class ConfigurationKeys {
public static final String CONVERTER_AVRO_FIELD_PICK_FIELDS = "converter.avro.fields";
public static final String CONVERTER_AVRO_JDBC_ENTRY_FIELDS_PAIRS = "converter.avro.jdbc.entry_fields_pairs";
public static final String CONVERTER_SKIP_FAILED_RECORD = "converter.skipFailedRecord";
+ public static final String CONVERTER_AVRO_SCHEMA_KEY = "converter.avroSchema";
+ public static final String CONVERTER_IGNORE_FIELDS = "converter.ignoreFields";
/**
* Fork operator configuration properties.
@@ -452,6 +454,10 @@ public class ConfigurationKeys {
* Configuration properties used by the extractor.
*/
public static final String SOURCE_ENTITY = "source.entity";
+ public static final String SCHEMA_IN_SOURCE_DIR = "schema.in.source.dir";
+ public static final boolean DEFAULT_SCHEMA_IN_SOURCE_DIR = false;
+ public static final String SCHEMA_FILENAME = "schema.filename";
+ public static final String DEFAULT_SCHEMA_FILENAME = "metadata.json";
// Comma-separated source entity names
public static final String SOURCE_ENTITIES = "source.entities";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
index 11f85f4..8e25975 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
import org.apache.gobblin.converter.DataConversionException;
@@ -42,18 +43,12 @@ import com.google.common.base.Splitter;
public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase<SI, JsonObject> {
private static final Splitter SPLITTER_ON_COMMA = Splitter.on(',').trimResults().omitEmptyStrings();
-
- public static final String AVRO_SCHEMA_KEY = "converter.avroSchema";
- public static final String IGNORE_FIELDS = "converter.ignoreFields";
-
private Schema schema;
private List<String> ignoreFields;
public ToAvroConverterBase<SI, JsonObject> init(WorkUnitState workUnit) {
super.init(workUnit);
- Preconditions.checkArgument(workUnit.contains(AVRO_SCHEMA_KEY));
- this.schema = new Schema.Parser().parse(workUnit.getProp(AVRO_SCHEMA_KEY));
- this.ignoreFields = SPLITTER_ON_COMMA.splitToList(workUnit.getProp(IGNORE_FIELDS, ""));
+ this.ignoreFields = SPLITTER_ON_COMMA.splitToList(workUnit.getProp(ConfigurationKeys.CONVERTER_IGNORE_FIELDS, ""));
return this;
}
@@ -62,6 +57,8 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase
*/
@Override
public Schema convertSchema(SI inputSchema, WorkUnitState workUnit) throws SchemaConversionException {
+ Preconditions.checkArgument(workUnit.contains(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY));
+ this.schema = new Schema.Parser().parse(workUnit.getProp(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY));
return this.schema;
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java
new file mode 100644
index 0000000..cc5167a
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedJsonFileSource.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.extractor.DatePartitionedJsonFileExtractor;
+import org.apache.gobblin.source.extractor.Extractor;
+
+import com.google.gson.JsonObject;
+
+public class DatePartitionedJsonFileSource extends PartitionedFileSourceBase<String, JsonObject> {
+
+ public DatePartitionedJsonFileSource() {
+ super(new DatePartitionedNestedRetriever(".json"));
+ }
+
+ @Override
+ public Extractor<String, JsonObject> getExtractor(WorkUnitState state)
+ throws IOException {
+ return new DatePartitionedJsonFileExtractor(state);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
index a9ff257..4c33555 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/DatePartitionedNestedRetriever.java
@@ -72,6 +72,8 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
private HadoopFsHelper helper;
private final String expectedExtension;
private Duration leadTimeDuration;
+ private boolean schemaInSourceDir;
+ private String schemaFile;
public DatePartitionedNestedRetriever(String expectedExtension) {
this.expectedExtension = expectedExtension;
@@ -91,6 +93,10 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
this.leadTimeDuration = PartitionAwareFileRetrieverUtils.getLeadTimeDurationFromConfig(state);
this.helper = new HadoopFsHelper(state);
+ this.schemaInSourceDir = state.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
+ ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR);
+ this.schemaFile = this.schemaInSourceDir ? state.getProp(ConfigurationKeys.SCHEMA_FILENAME,
+ ConfigurationKeys.DEFAULT_SCHEMA_FILENAME) : "";
}
@Override
@@ -201,7 +207,8 @@ public class DatePartitionedNestedRetriever implements PartitionAwareFileRetriev
return new PathFilter() {
@Override
public boolean accept(Path path) {
- return path.getName().endsWith(extension);
+ return path.getName().endsWith(extension) &&
+ !(schemaInSourceDir && path.getName().equals(schemaFile)) ;
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
index 9ec7707..1b54895 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/PartitionedFileSourceBase.java
@@ -69,7 +69,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
public static final String DATE_PARTITIONED_SOURCE_PARTITION_SUFFIX =
DATE_PARTITIONED_SOURCE_PREFIX + ".partition.suffix";
- static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN =
+ public static final String DATE_PARTITIONED_SOURCE_PARTITION_PATTERN =
DATE_PARTITIONED_SOURCE_PREFIX + ".partition.pattern";
public static final String DATE_PARTITIONED_SOURCE_PARTITION_GRANULARITY =
@@ -99,7 +99,7 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
* If this parameter is not specified the job will start reading data from
* the beginning of Unix time.
*/
- private static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE =
+ public static final String DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE =
DATE_PARTITIONED_SOURCE_PREFIX + ".min.watermark.value";
/**
@@ -291,6 +291,11 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_HIGH_WATER_MARK_KEY, file.getWatermarkMsSinceEpoch());
singleWorkUnit.setProp(ConfigurationKeys.WORK_UNIT_DATE_PARTITION_KEY, file.getWatermarkMsSinceEpoch());
+ if (this.sourceState.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
+ ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR)) {
+ addSchemaFile(file, singleWorkUnit);
+ }
+
multiWorkUnitWeightedQueue.addWorkUnit(singleWorkUnit, file.getFileSize());
this.fileCount++;
@@ -302,6 +307,17 @@ public abstract class PartitionedFileSourceBase<SCHEMA, DATA> extends FileBasedS
}
}
+ private void addSchemaFile(PartitionAwareFileRetriever.FileInfo dataFile, WorkUnit workUnit)
+ throws IOException {
+ Path schemaFile = new Path(new Path(dataFile.getFilePath()).getParent(),
+ workUnit.getProp(ConfigurationKeys.SCHEMA_FILENAME, ConfigurationKeys.DEFAULT_SCHEMA_FILENAME));
+ if (fs.exists(schemaFile)) {
+ workUnit.setProp(ConfigurationKeys.SOURCE_SCHEMA, schemaFile.toString());
+ } else {
+ throw new IOException("Schema file " + schemaFile + " does not exist.");
+ }
+ }
+
/**
* Gets the LWM for this job runs. The new LWM is the HWM of the previous run + 1 unit (day,hour,minute..etc).
* If there was no previous execution then it is set to the given lowWaterMark + 1 unit.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
index e082bda..7d3ad92 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/RegexBasedPartitionedRetriever.java
@@ -48,6 +48,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
private Path sourceDir;
private final String expectedExtension;
private Duration leadTime;
+ private boolean schemaInSourceDir;
+ private String schemaFile;
public RegexBasedPartitionedRetriever(String expectedExtension) {
this.expectedExtension = expectedExtension;
@@ -64,6 +66,10 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
this.pattern = Pattern.compile(regexPattern);
this.helper = new HadoopFsHelper(state);
this.sourceDir = new Path(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY));
+ this.schemaInSourceDir = state.getPropAsBoolean(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR,
+ ConfigurationKeys.DEFAULT_SCHEMA_IN_SOURCE_DIR);
+ this.schemaFile = this.schemaInSourceDir ? state.getProp(ConfigurationKeys.SCHEMA_FILENAME,
+ ConfigurationKeys.DEFAULT_SCHEMA_FILENAME) : "";
}
@Override
@@ -175,7 +181,8 @@ public class RegexBasedPartitionedRetriever implements PartitionAwareFileRetriev
return new PathFilter() {
@Override
public boolean accept(Path path) {
- return path.getName().endsWith(extension);
+ return path.getName().endsWith(extension) &&
+ !(schemaInSourceDir && path.getName().equals(schemaFile)) ;
}
};
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java
new file mode 100644
index 0000000..b165a35
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/DatePartitionedJsonFileExtractor.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor;
+
+import java.io.IOException;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+
+public class DatePartitionedJsonFileExtractor extends SimpleJsonExtractor {
+
+ public DatePartitionedJsonFileExtractor(WorkUnitState workUnitState)
+ throws IOException {
+ super(workUnitState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java
new file mode 100644
index 0000000..412a06e
--- /dev/null
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/SimpleJsonExtractor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor;
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import org.apache.gobblin.source.extractor.hadoop.HadoopFsHelper;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.io.Closer;
+import com.google.gson.JsonObject;
+import com.google.gson.Gson;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+
+/**
+ * An implementation of {@link Extractor}.
+ *
+ * <p>
+ * This extractor reads the assigned input file storing
+ * json documents confirming to a schema. Each line of the file is a json document.
+ * </p>
+ */
+public class SimpleJsonExtractor implements Extractor<String, JsonObject> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SimpleJsonExtractor.class);
+ private final WorkUnitState workUnitState;
+ private final FileSystem fs;
+ private final BufferedReader bufferedReader;
+ private final Closer closer = Closer.create();
+ private static final Gson GSON = new Gson();
+
+ public SimpleJsonExtractor(WorkUnitState workUnitState) throws IOException {
+ this.workUnitState = workUnitState;
+
+ HadoopFsHelper fsHelper = new HadoopFsHelper(workUnitState);
+ try {
+ fsHelper.connect();
+ } catch (Exception e) {
+ throw new IOException("Exception at SimpleJsonExtractor");
+ }
+ // Source is responsible to set SOURCE_FILEBASED_FILES_TO_PULL
+ this.fs = fsHelper.getFileSystem();
+ InputStreamReader isr = new InputStreamReader(this.fs.open(
+ new Path(workUnitState.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL))), StandardCharsets.UTF_8);
+
+ this.bufferedReader =
+ this.closer.register(new BufferedReader(isr));
+ }
+
+ @Override
+ public String getSchema() throws IOException {
+ // Source is responsible to set SOURCE_SCHEMA
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ IOUtils.copyBytes(fs.open(
+ new Path(workUnitState.getProp(ConfigurationKeys.SOURCE_SCHEMA))), outputStream, 4096, false);
+ String schema = new String(outputStream.toByteArray(), StandardCharsets.UTF_8);
+ workUnitState.setProp((ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY), schema);
+ return schema;
+ }
+
+ @Override
+ public JsonObject readRecord(@Deprecated JsonObject reuse) throws DataRecordException, IOException {
+ String jsonString = this.bufferedReader.readLine();
+ return GSON.fromJson(jsonString, JsonObject.class);
+ }
+
+ @Override
+ public long getExpectedRecordCount() {
+ // We don't know how many records are in the file before actually reading them
+ return 0;
+ }
+
+ @Override
+ public long getHighWatermark() {
+ // Watermark is not applicable for this type of extractor
+ return 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ this.closer.close();
+ } catch (IOException ioe) {
+ LOGGER.error("Failed to close the input stream", ioe);
+ }
+
+ try {
+ fs.close();
+ } catch (IOException ioe) {
+ LOGGER.error("Failed to close the file object", ioe);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
index 9971d83..4cf6898 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java
@@ -23,6 +23,7 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.source.workunit.Extract.TableType;
@@ -52,8 +53,8 @@ public class JsonRecordAvroSchemaToAvroConverterTest {
SourceState source = new SourceState();
this.state = new WorkUnitState(
source.createWorkUnit(source.createExtract(TableType.SNAPSHOT_ONLY, "test_table", "test_namespace")));
- this.state.setProp(JsonRecordAvroSchemaToAvroConverter.AVRO_SCHEMA_KEY, avroSchemaString);
- this.state.setProp(JsonRecordAvroSchemaToAvroConverter.IGNORE_FIELDS, "fieldToIgnore");
+ this.state.setProp(ConfigurationKeys.CONVERTER_AVRO_SCHEMA_KEY, avroSchemaString);
+ this.state.setProp(ConfigurationKeys.CONVERTER_IGNORE_FIELDS, "fieldToIgnore");
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
index 8cceff2..95b3656 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/source/extractor/filebased/FileBasedSourceTest.java
@@ -21,10 +21,13 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.source.DatePartitionedJsonFileSource;
+import org.apache.gobblin.source.PartitionedFileSourceBase;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
@@ -32,6 +35,7 @@ import org.testng.collections.Lists;
import java.io.IOException;
import java.util.List;
+
@Test
public class FileBasedSourceTest {
@Test
@@ -57,6 +61,25 @@ public class FileBasedSourceTest {
}
}
+ @Test void numberOfWorkUnits() throws IOException {
+ SourceState sourceState = new SourceState();
+ DatePartitionedJsonFileSource source = new DatePartitionedJsonFileSource();
+ initState(sourceState);
+ List<WorkUnit> workUnits = source.getWorkunits(sourceState);
+ Assert.assertEquals(3, workUnits.size());
+ }
+
+ private void initState(State state) {
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY,
+ new Path(getClass().getResource("/source").toString()).toString());
+ state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_PARTITION_PATTERN, "yyyy-MM");
+ state.setProp(PartitionedFileSourceBase.DATE_PARTITIONED_SOURCE_MIN_WATERMARK_VALUE, "2017-11");
+ state.setProp(ConfigurationKeys.EXTRACT_TABLE_TYPE_KEY, "snapshot_only");
+ state.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "file:///");
+ state.setProp(ConfigurationKeys.SCHEMA_IN_SOURCE_DIR, "true");
+ state.setProp(ConfigurationKeys.SCHEMA_FILENAME, "metadata.json");
+ }
+
private static class DummyFileBasedSource extends FileBasedSource<String, String> {
@Override
public void initFileSystemHelper(State state) throws FileBasedHelperException {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2017-12/metadata.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2017-12/metadata.json b/gobblin-core/src/test/resources/source/2017-12/metadata.json
new file mode 100644
index 0000000..0003f63
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2017-12/metadata.json
@@ -0,0 +1 @@
+{"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number", "type":"int"}, {"name":"favorite_color", "type":"string"}]}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2017-12/simplejson.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2017-12/simplejson.json b/gobblin-core/src/test/resources/source/2017-12/simplejson.json
new file mode 100644
index 0000000..c325df0
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2017-12/simplejson.json
@@ -0,0 +1,3 @@
+{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"}
+{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
+{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/metadata.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2018-01/metadata.json b/gobblin-core/src/test/resources/source/2018-01/metadata.json
new file mode 100644
index 0000000..0003f63
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2018-01/metadata.json
@@ -0,0 +1 @@
+{"namespace":"example.avro", "type":"record", "name":"User", "fields":[{"name":"name", "type":"string"}, {"name":"favorite_number", "type":"int"}, {"name":"favorite_color", "type":"string"}]}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/simplejson.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2018-01/simplejson.json b/gobblin-core/src/test/resources/source/2018-01/simplejson.json
new file mode 100644
index 0000000..c325df0
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2018-01/simplejson.json
@@ -0,0 +1,3 @@
+{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"}
+{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
+{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5c678d9b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/source/2018-01/simplejson2.json b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json
new file mode 100644
index 0000000..c325df0
--- /dev/null
+++ b/gobblin-core/src/test/resources/source/2018-01/simplejson2.json
@@ -0,0 +1,3 @@
+{"name": "Alyssa", "favorite_number": 256, "favorite_color": "yellow"}
+{"name": "Ben", "favorite_number": 7, "favorite_color": "red"}
+{"name": "Charlie", "favorite_number": 68, "favorite_color": "blue"}