You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2017/11/09 00:42:55 UTC

incubator-gobblin git commit: [GOBBLIN-305] Add csv-kafka kafka-hdfs job template

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 43d5ed520 -> 1b7748a68


[GOBBLIN-305] Add csv-kafka kafka-hdfs job template

Closes #2160 from zxcware/odsc2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/1b7748a6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/1b7748a6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/1b7748a6

Branch: refs/heads/master
Commit: 1b7748a688ea5b00b0d8bfb6c8e82d32e635482b
Parents: 43d5ed5
Author: zhchen <zh...@linkedin.com>
Authored: Wed Nov 8 16:42:46 2017 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Wed Nov 8 16:42:46 2017 -0800

----------------------------------------------------------------------
 .../converter/csv/CsvToJsonConverterV2.java     | 69 ++++++++++++++++--
 .../extractor/filebased/FileBasedSource.java    |  9 +--
 .../test/resources/converter/csv/10_fields.json | 16 ++---
 .../converter/csv/11_fields_with_null.json      | 16 ++---
 .../main/resources/templates/csv-kafka.template | 74 ++++++++++++++++++++
 .../resources/templates/kafka-hdfs.template     | 69 ++++++++++++++++++
 6 files changed, 229 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
index b4829f0..e69f277 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/csv/CsvToJsonConverterV2.java
@@ -30,6 +30,7 @@ import com.google.gson.JsonElement;
 import com.google.gson.JsonNull;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
 
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.converter.Converter;
@@ -69,6 +70,8 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[],
   private static final Logger LOG = LoggerFactory.getLogger(CsvToJsonConverterV2.class);
   private static final JsonParser JSON_PARSER = new JsonParser();
   private static final String COLUMN_NAME_KEY = "columnName";
+  private static final String DATA_TYPE_KEY = "dataType";
+  private static final String TYPE = "type";
   private static final String JSON_NULL_VAL = "null";
 
   public static final String CUSTOM_ORDERING = "converter.csv_to_json.custom_order";
@@ -173,12 +176,13 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[],
     JsonObject outputRecord = new JsonObject();
 
     for (int i = 0; i < outputSchema.size(); i++) {
-      String key = outputSchema.get(i).getAsJsonObject().get(COLUMN_NAME_KEY).getAsString();
+      JsonObject field = outputSchema.get(i).getAsJsonObject();
+      String key = field.get(COLUMN_NAME_KEY).getAsString();
 
       if (StringUtils.isEmpty(inputRecord[i]) || JSON_NULL_VAL.equalsIgnoreCase(inputRecord[i])) {
         outputRecord.add(key, JsonNull.INSTANCE);
       } else {
-        outputRecord.addProperty(key, inputRecord[i]);
+        outputRecord.add(key, convertValue(inputRecord[i], field.getAsJsonObject(DATA_TYPE_KEY)));
       }
     }
 
@@ -195,16 +199,73 @@ public class CsvToJsonConverterV2 extends Converter<String, JsonArray, String[],
     Iterator<String> customOrderIterator = customOrder.iterator();
 
     while(outputSchemaIterator.hasNext() && customOrderIterator.hasNext()) {
-      String key = outputSchemaIterator.next().getAsJsonObject().get(COLUMN_NAME_KEY).getAsString();
+      JsonObject field = outputSchemaIterator.next().getAsJsonObject();
+      String key = field.get(COLUMN_NAME_KEY).getAsString();
       int i = Integer.parseInt(customOrderIterator.next());
       Preconditions.checkArgument(i < inputRecord.length, "Index out of bound detected in customer order. Index: " + i + " , # of CSV columns: " + inputRecord.length);
       if (i < 0 || null == inputRecord[i] || JSON_NULL_VAL.equalsIgnoreCase(inputRecord[i])) {
         outputRecord.add(key, JsonNull.INSTANCE);
         continue;
       }
-      outputRecord.addProperty(key, inputRecord[i]);
+      outputRecord.add(key, convertValue(inputRecord[i], field.getAsJsonObject(DATA_TYPE_KEY)));
     }
 
     return outputRecord;
   }
+
+  /**
+   * Convert string value to the expected type
+   */
+  private JsonElement convertValue(String value, JsonObject dataType) {
+    if (dataType == null || !dataType.has(TYPE)) {
+      return new JsonPrimitive(value);
+    }
+
+    String type = dataType.get(TYPE).getAsString().toUpperCase();
+    ValueType valueType = ValueType.valueOf(type);
+    return valueType.convert(value);
+  }
+
+  /**
+   * An enum of type conversions from string value
+   */
+  private enum ValueType {
+    INT {
+      @Override
+      JsonElement convert(String value) {
+        return new JsonPrimitive(Double.valueOf(value).intValue());
+      }
+    },
+    LONG {
+      @Override
+      JsonElement convert(String value) {
+        return new JsonPrimitive(Double.valueOf(value).longValue());
+      }
+    },
+    FLOAT {
+      @Override
+      JsonElement convert(String value) {
+        return new JsonPrimitive(Double.valueOf(value).floatValue());
+      }
+    },
+    DOUBLE {
+      @Override
+      JsonElement convert(String value) {
+        return new JsonPrimitive(Double.valueOf(value));
+      }
+    },
+    BOOLEAN {
+      @Override
+      JsonElement convert(String value) {
+        return new JsonPrimitive(Boolean.valueOf(value));
+      }
+    },
+    STRING {
+      @Override
+      JsonElement convert(String value) {
+        return new JsonPrimitive(value);
+      }
+    };
+    abstract JsonElement convert(String value);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
index a26b052..d693f44 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/source/extractor/filebased/FileBasedSource.java
@@ -215,10 +215,11 @@ public abstract class FileBasedSource<S, D> extends AbstractSource<S, D> {
       results = this.fsHelper.ls(path);
       for (int i = 0; i < results.size(); i++) {
         URI uri = new URI(results.get(i));
-        File file = uri.isAbsolute()?
-            new File(uri) : new File(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY), uri.toString());
-
-        String filePath = file.getAbsolutePath();
+        String filePath = uri.toString();
+        if (!uri.isAbsolute()) {
+          File file = new File(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_DATA_DIRECTORY), uri.toString());
+          filePath = file.getAbsolutePath();
+        }
         results.set(i, filePath + this.splitPattern + this.fsHelper.getFileMTime(filePath));
       }
     } catch (FileBasedHelperException | URISyntaxException e) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/test/resources/converter/csv/10_fields.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/csv/10_fields.json b/gobblin-core/src/test/resources/converter/csv/10_fields.json
index c59d524..ccc1349 100644
--- a/gobblin-core/src/test/resources/converter/csv/10_fields.json
+++ b/gobblin-core/src/test/resources/converter/csv/10_fields.json
@@ -1,12 +1,12 @@
 {
   "Date": "20160924",
   "DeviceCategory": "desktop",
-  "Sessions": "42935",
-  "BounceRate": "0.0446255968324211",
-  "AvgSessionDuration": "1590.4702457202748",
-  "Pageviews": "348380",
-  "PageviewsPerSession": "8.1141260044252945",
-  "UniquePageviews": "232467",
-  "AvgTimeOnPage": "206.98603475430664",
-  "User_count": "33028"
+  "Sessions": 42935,
+  "BounceRate": 0.0446255968324211,
+  "AvgSessionDuration": 1590.4702457202748,
+  "Pageviews": 348380,
+  "PageviewsPerSession": 8.1141260044252945,
+  "UniquePageviews": 232467,
+  "AvgTimeOnPage": 206.98603475430664,
+  "User_count": 33028
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
----------------------------------------------------------------------
diff --git a/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json b/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
index 178de16..f4c0637 100644
--- a/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
+++ b/gobblin-core/src/test/resources/converter/csv/11_fields_with_null.json
@@ -2,12 +2,12 @@
   "Date": "20160924",
   "DeviceCategory": "desktop",
   "Segment": null,
-  "Sessions": "42935",
-  "BounceRate": "0.0446255968324211",
-  "AvgSessionDuration": "1590.4702457202748",
-  "Pageviews": "348380",
-  "PageviewsPerSession": "8.1141260044252945",
-  "UniquePageviews": "232467",
-  "AvgTimeOnPage": "206.98603475430664",
-  "User_count": "33028"
+  "Sessions": 42935,
+  "BounceRate": 0.0446255968324211,
+  "AvgSessionDuration": 1590.4702457202748,
+  "Pageviews": 348380,
+  "PageviewsPerSession": 8.1141260044252945,
+  "UniquePageviews": 232467,
+  "AvgTimeOnPage": 206.98603475430664,
+  "User_count": 33028
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-runtime/src/main/resources/templates/csv-kafka.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/resources/templates/csv-kafka.template b/gobblin-runtime/src/main/resources/templates/csv-kafka.template
new file mode 100644
index 0000000..d85ce1a
--- /dev/null
+++ b/gobblin-runtime/src/main/resources/templates/csv-kafka.template
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ====================================================================
+# Job configurations
+# ====================================================================
+
+# Required configuration constraints
+gobblin.template.required_attributes="input.fs.uri,input.dir,job.work.dir,state.store.fs.uri,writer.fs.uri,csv.schema.json,csv.kafka.brokers,csv.kafka.topic"
+
+# Job info
+job.name=CsvKafka
+job.lock.enabled=false
+
+# Gobblin data storage configurations
+state.store.dir=${job.work.dir}/state-store
+task.data.root.dir=${job.work.dir}/task-data
+writer.staging.dir=${job.work.dir}/task-staging
+writer.output.dir=${job.work.dir}/task-output
+data.publisher.final.dir=${job.work.dir}/job-output
+
+# Gobblin MapReduce configurations
+mr.job.root.dir=${job.work.dir}/working
+mr.job.lock.dir=${job.work.dir}/locks
+
+# Source
+source.class=org.apache.gobblin.source.extractor.filebased.TextFileBasedSource
+source.filebased.downloader.class=org.apache.gobblin.source.extractor.filebased.CsvFileDownloader
+source.schema=${csv.schema.json}
+
+## Skip header
+source.skip.first.record=true
+source.filebased.fs.uri=${input.fs.uri}
+source.filebased.data.directory=${input.dir}
+source.filebased.maxFilesPerRun=1
+source.max.number.of.partitions=3
+
+## Extract
+extract.namespace=data
+extract.table.name=csv
+extract.table.type=SNAPSHOT_APPEND
+
+# Task execution configurations
+taskexecutor.threadpool.size=4
+taskretry.threadpool.coresize=1
+taskretry.threadpool.maxsize=1
+
+# Converter
+converter.classes=org.apache.gobblin.converter.csv.CsvToJsonConverterV2
+
+# Writer
+writer.destination.type=KAFKA
+writer.output.format=json
+writer.builder.class=org.apache.gobblin.kafka.writer.Kafka09JsonObjectWriterBuilder
+
+writer.kafka.topic=${csv.kafka.topic}
+writer.kafka.producerConfig.value.serializer=org.apache.kafka.common.serialization.StringSerializer
+writer.kafka.producerConfig.bootstrap.servers=${csv.kafka.brokers}
+writer.kafka.producerConfig.retries=3
+writer.kafka.producerConfig.client.id=CsvKafka
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/1b7748a6/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
----------------------------------------------------------------------
diff --git a/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
new file mode 100644
index 0000000..773f848
--- /dev/null
+++ b/gobblin-runtime/src/main/resources/templates/kafka-hdfs.template
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# ====================================================================
+# Job configurations
+# ====================================================================
+
+# Required configuration constraints
+gobblin.template.required_attributes="job.work.dir,state.store.fs.uri,writer.fs.uri,kafka.schema.json,kafka.brokers,kafka.topics"
+
+# Job info
+job.name=KafkaHDFS
+job.lock.enabled=false
+
+# Gobblin data storage configurations
+state.store.dir=${job.work.dir}/state-store
+task.data.root.dir=${job.work.dir}/task-data
+writer.staging.dir=${job.work.dir}/task-staging
+writer.output.dir=${job.work.dir}/task-output
+data.publisher.final.dir=${job.work.dir}/job-output
+
+# Gobblin MapReduce configurations
+mr.job.root.dir=${job.work.dir}/working
+mr.job.lock.dir=${job.work.dir}/locks
+
+# Source
+source.class=org.apache.gobblin.source.extractor.extract.kafka.Kafka09JsonSource
+source.kafka.json.schema=${kafka.schema.json}
+
+topic.whitelist=${kafka.topics}
+org.apache.gobblin.kafka.consumerClient.class="org.apache.gobblin.kafka.client.Kafka09ConsumerClient$Factory"
+bootstrap.with.offset=earliest
+kafka.workunit.packer.type=SINGLE_LEVEL
+mr.job.max.mappers=10
+
+## Extract
+extract.namespace=kafka
+extract.table.type=SNAPSHOT_APPEND
+extract.limit.type=time
+extract.limit.enabled=true
+extract.limit.timeLimit=5
+extract.limit.timeLimitTimeunit=minutes
+
+# Task execution configurations
+taskexecutor.threadpool.size=4
+taskretry.threadpool.coresize=1
+taskretry.threadpool.maxsize=1
+
+# Converter
+converter.classes=org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter
+
+# Writer
+writer.destination.type=HDFS
+writer.output.format=AVRO
+writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
\ No newline at end of file