You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/09 00:42:55 UTC
incubator-gobblin git commit: [GOBBLIN-305] Add csv-kafka kafka-hdfs
job template
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 43d5ed520 -> 1b7748a68
[GOBBLIN-305] Add csv-kafka kafka-hdfs job template
Closes #2160 from zxcware/odsc2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1b7748a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1b7748a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1b7748a6
Branch: refs/heads/master
Commit: 1b7748a688ea5b00b0d8bfb6c8e82d32e635482b
Parents: 43d5ed5
Author: zhchen <zh...@linkedin.com>
Authored: Wed Nov 8 16:42:46 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Nov 8 16:42:46 2017 -0800
----------------------------------------------------------------------
.../converter/csv/CsvToJsonConverterV2.java | 69 ++++++++++++++++--
.../extractor/filebased/FileBasedSource.java | 9 +--
.../test/resources/converter/csv/10_fields.json | 16 ++---
.../converter/csv/11_fields_with_null.json | 16 ++---
.../main/resources/templates/csv-kafka.template | 74 ++++++++++++++++++++
.../resources/templates/kafka-hdfs.template | 69 ++++++++++++++++++
6 files changed, 229 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
index b4829f0..e69f277 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
@@ -30,6 +30,7 @@ import com.google.gson.JsonElement;
import com.google.gson.JsonNull;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.converter.Converter;
@@ -69,6 +70,8 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[],
private static final Logger LOG = LoggerFactory.getLogger(CsvToJsonConverterV2.class);
private static final JsonParser JSON_PARSER = new JsonParser();
private static final String COLUMN_NAME_KEY = "columnName";
+ private static final String DATA_TYPE_KEY = "dataType";
+ private static final String TYPE = "type";
private static final String JSON_NULL_VAL = "null";
public static final String CUSTOM_ORDERING = "converter.csv_to_json.custom_order";
@@ -173,12 +176,13 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[],
JsonObject outputRecord = new JsonObject();
for (int i = 0; i < outputSchema.size(); i++) {
- String key = outputSchema.get(i).getAsJsonObject().get(COLUMN_NAME_KEY).getAsString();
+ JsonObject field = outputSchema.get(i).getAsJsonObject();
+ String key = field.get(COLUMN_NAME_KEY).getAsString();
if (StringUtils.isEmpty(inputRecord[i]) || JSON_NULL_VAL.equalsIgnoreCase(inputRecord[i])) {
outputRecord.add(key, JsonNull.INSTANCE);
} else {
- outputRecord.addProperty(key, inputRecord[i]);
+ outputRecord.add(key, convertValue(inputRecord[i], field.getAsJsonObject(DATA_TYPE_KEY)));
}
}
@@ -195,16 +199,73 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[],
Iterator<String> customOrderIterator = customOrder.iterator();
while(outputSchemaIterator.hasNext() && customOrderIterator.hasNext()) {
- String key = outputSchemaIterator.next().getAsJsonObject().get(COLUMN_NAME_KEY).getAsString();
+ JsonObject field = outputSchemaIterator.next().getAsJsonObject();
+ String key = field.get(COLUMN_NAME_KEY).getAsString();
int i = Integer.parseInt(customOrderIterator.next());
Preconditions.checkArgument(i < inputRecord.length, "Index out of bound detected in customer order. Index: " + i + " , # of CSV columns: " + inputRecord.length);
if (i < 0 || null == inputRecord[i] || JSON_NULL_VAL.equalsIgnoreCase(inputRecord[i])) {
outputRecord.add(key, JsonNull.INSTANCE);
continue;
}
- outputRecord.addProperty(key, inputRecord[i]);
+ outputRecord.add(key, convertValue(inputRecord[i], field.getAsJsonObject(DATA_TYPE_KEY)));
}
return outputRecord;
}
+
+ /**
+ * Convert string value to the expected type
+ */
+ private JsonElement convertValue(String value, JsonObject dataType) {
+ if (dataType == null || !dataType.has(TYPE)) {
+ return new JsonPrimitive(value);
+ }
+
+ String type = dataType.get(TYPE).getAsString().toUpperCase();
+ ValueType valueType = ValueType.valueOf(type);
+ return valueType.convert(value);
+ }
+
+ /**
+ * An enum of type conversions from string value
+ */
+ private enum ValueType {
+ INT {
+ @Override
+ JsonElement convert(String value) {
+ return new JsonPrimitive(Double.valueOf(value).intValue());
+ }
+ },
+ LONG {
+ @Override
+ JsonElement convert(String value) {
+ return new JsonPrimitive(Double.valueOf(value).longValue());
+ }
+ },
+ FLOAT {
+ @Override
+ JsonElement convert(String value) {
+ return new JsonPrimitive(Double.valueOf(value).floatValue());
+ }
+ },
+ DOUBLE {
+ @Override
+ JsonElement convert(String value) {
+ return new JsonPrimitive(Double.valueOf(value));
+ }
+ },
+ BOOLEAN {
+ @Override
+ JsonElement convert(String value) {
+ return new JsonPrimitive(Boolean.valueOf(value));
+ }
+ },
+ STRING {
+ @Override
+ JsonElement convert(String value) {
+ return new JsonPrimitive(value);
+ }
+ };
+ abstract JsonElement convert(String value);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index a26b052..d693f44 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -215,10 +215,11 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
results = this.fsHelper.ls(path);
for (int i = 0; i < results.size(); i++) {
URI uri = new URI(results.get(i));
- File file = uri.isAbsolute()?
- new File(uri) : new File(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY), uri.toString());
-
- String filePath = file.getAbsolutePath();
+ String filePath = uri.toString();
+ if (!uri.isAbsolute()) {
+ File file = new File(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY), uri.toString());
+ filePath = file.getAbsolutePath();
+ }
results.set(i, filePath + this.splitPattern + this.fsHelper.getFileMTime(filePath));
}
} catch (FileBasedHelperException | URISyntaxException e) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/test/resources/converter/csv/10_fields.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/csv/10_fields.json b/gobblin-core/src/test/resources/converter/csv/10_fields.json
index c59d524..ccc1349 100644
--- a/gobblin-core/src/test/resources/converter/csv/10_fields.json
+++ b/gobblin-core/src/test/resources/converter/csv/10_fields.json
@@ -1,12 +1,12 @@
{
"Date": "20160924",
"DeviceCategory": "desktop",
- "Sessions": "42935",
- "BounceRate": "0.0446255968324211",
- "AvgSessionDuration": "1590.4702457202748",
- "Pageviews": "348380",
- "PageviewsPerSession": "8.1141260044252945",
- "UniquePageviews": "232467",
- "AvgTimeOnPage": "206.98603475430664",
- "User_count": "33028"
+ "Sessions": 42935,
+ "BounceRate": 0.0446255968324211,
+ "AvgSessionDuration": 1590.4702457202748,
+ "Pageviews": 348380,
+ "PageviewsPerSession": 8.1141260044252945,
+ "UniquePageviews": 232467,
+ "AvgTimeOnPage": 206.98603475430664,
+ "User_count": 33028
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json b/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
index 178de16..f4c0637 100644
--- a/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
+++ b/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
@@ -2,12 +2,12 @@
"Date": "20160924",
"DeviceCategory": "desktop",
"Segment": null,
- "Sessions": "42935",
- "BounceRate": "0.0446255968324211",
- "AvgSessionDuration": "1590.4702457202748",
- "Pageviews": "348380",
- "PageviewsPerSession": "8.1141260044252945",
- "UniquePageviews": "232467",
- "AvgTimeOnPage": "206.98603475430664",
- "User_count": "33028"
+ "Sessions": 42935,
+ "BounceRate": 0.0446255968324211,
+ "AvgSessionDuration": 1590.4702457202748,
+ "Pageviews": 348380,
+ "PageviewsPerSession": 8.1141260044252945,
+ "UniquePageviews": 232467,
+ "AvgTimeOnPage": 206.98603475430664,
+ "User_count": 33028
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-runtime/src/main/resources/templates/csv-kafka.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/resources/templates/csv-kafka.template b/gobblin-runtime/src/main/resources/templates/csv-kafka.template
new file mode 100644
index 0000000..d85ce1a
--- /dev/null
+++ b/gobblin-runtime/src/main/resources/templates/csv-kafka.template
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ====================================================================
+# Job configurations
+# ====================================================================
+
+# Required configuration constraints
+gobblin.template.required_attributes="input.fs.uri,input.dir,job.work.dir,state.store.fs.uri,writer.fs.uri,csv.schema.json,csv.kafka.brokers,csv.kafka.topic"
+
+# Job info
+job.name=CsvKafka
+job.lock.enabled=false
+
+# Gobblin data storage configurations
+state.store.dir=${job.work.dir}/state-store
+task.data.root.dir=${job.work.dir}/task-data
+writer.staging.dir=${job.work.dir}/task-staging
+writer.output.dir=${job.work.dir}/task-output
+data.publisher.final.dir=${job.work.dir}/job-output
+
+# Gobblin MapReduce configurations
+mr.job.root.dir=${job.work.dir}/working
+mr.job.lock.dir=${job.work.dir}/locks
+
+# Source
+source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource
+source.filebased.downloader.class=org.apache.gobblin.source.extractor.filebased.CsvFileDownloader
+source.schema=${csv.schema.json}
+
+## Skip header
+source.skip.first.record=true
+source.filebased.fs.uri=${input.fs.uri}
+source.filebased.data.directory=${input.dir}
+source.filebased.maxFilesPerRun=1
+source.max.number.of.partitions=3
+
+## Extract
+extract.namespace=data
+extract.table.name=csv
+extract.table.type=SNAPSHOT_APPEND
+
+# Task execution configurations
+taskexecutor.threadpool.size=4
+taskretry.threadpool.coresize=1
+taskretry.threadpool.maxsize=1
+
+# Converter
+converter.classes=org.apache.gobblin.converter.csv.CsvToJsonConverterV2
+
+# Writer
+writer.destination.type=KAFKA
+writer.output.format=json
+writer.builder.class=org.apache.gobblin.kafka.writer.Kafka09JsonObjectWriterBuilder
+
+writer.kafka.topic=${csv.kafka.topic}
+writer.kafka.producerConfig.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+writer.kafka.producerConfig.bootstrap.servers=${csv.kafka.brokers}
+writer.kafka.producerConfig.retries=3
+writer.kafka.producerConfig.client.id=CsvKafka
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
new file mode 100644
index 0000000..773f848
--- /dev/null
+++ b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ====================================================================
+# Job configurations
+# ====================================================================
+
+# Required configuration constraints
+gobblin.template.required_attributes="job.work.dir,state.store.fs.uri,writer.fs.uri,kafka.schema.json,kafka.brokers,kafka.topics"
+
+# Job info
+job.name=KafkaHDFS
+job.lock.enabled=false
+
+# Gobblin data storage configurations
+state.store.dir=${job.work.dir}/state-store
+task.data.root.dir=${job.work.dir}/task-data
+writer.staging.dir=${job.work.dir}/task-staging
+writer.output.dir=${job.work.dir}/task-output
+data.publisher.final.dir=${job.work.dir}/job-output
+
+# Gobblin MapReduce configurations
+mr.job.root.dir=${job.work.dir}/working
+mr.job.lock.dir=${job.work.dir}/locks
+
+# Source
+source.class=org.apache.gobblin.source.extractor.extract.kafka.Kafka09JsonSource
+source.kafka.json.schema=${kafka.schema.json}
+
+topic.whitelist=${kafka.topics}
+org.apache.gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
+bootstrap.with.offset=earliest
+kafka.workunit.packer.type=SINGLE_LEVEL
+mr.job.max.mappers=10
+
+## Extract
+extract.namespace=kafka
+extract.table.type=SNAPSHOT_APPEND
+extract.limit.type=time
+extract.limit.enabled=true
+extract.limit.timeLimit=5
+extract.limit.timeLimitTimeunit=minutes
+
+# Task execution configurations
+taskexecutor.threadpool.size=4
+taskretry.threadpool.coresize=1
+taskretry.threadpool.maxsize=1
+
+# Converter
+converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
+
+# Writer
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
\ No newline at end of file