You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/08/19 08:27:44 UTC
apex-malhar git commit: APEXMALHAR-2151 delimited file format support
to FSLoader
Repository: apex-malhar
Updated Branches:
refs/heads/master 9b6e11d85 -> 571db6c06
APEXMALHAR-2151 delimited file format support to FSLoader
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/571db6c0
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/571db6c0
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/571db6c0
Branch: refs/heads/master
Commit: 571db6c063a0cd34d9379148f3eada85b16a0760
Parents: 9b6e11d
Author: shubham <sh...@github.com>
Authored: Wed Aug 10 14:13:10 2016 +0530
Committer: shubham <sh...@github.com>
Committed: Thu Aug 18 14:05:30 2016 +0530
----------------------------------------------------------------------
.../contrib/enrich/DelimitedFSLoader.java | 165 +++++++++++++++++++
.../datatorrent/contrib/enrich/FSLoader.java | 52 +++---
.../contrib/enrich/JsonFSLoader.java | 74 +++++++++
.../contrib/enrich/FileEnrichmentTest.java | 70 +++++++-
.../src/test/resources/productmapping-delim.txt | 100 +++++++++++
5 files changed, 431 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
new file mode 100644
index 0000000..25f283c
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.io.CsvMapReader;
+import org.supercsv.prefs.CsvPreference;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.contrib.parser.CellProcessorBuilder;
+import com.datatorrent.contrib.parser.DelimitedSchema;
+import com.datatorrent.contrib.parser.DelimitedSchema.Field;
+import com.datatorrent.lib.util.ReusableStringReader;
+
+/**
+ * This implementation of {@link FSLoader} is used to load data from delimited
+ * file.User needs to provide a schema as a string specified in a json format as
+ * per {@link DelimitedSchema} that contains information of name and type of
+ * field
+ */
+@InterfaceStability.Evolving
+public class DelimitedFSLoader extends FSLoader
+{
+
+ /**
+ * Map Reader to read delimited records
+ */
+ private transient CsvMapReader csvMapReader;
+ /**
+ * Reader used by csvMapReader
+ */
+ private transient ReusableStringReader csvStringReader;
+ /**
+ * Contents of the schema.Schema is specified in a json format as per
+ * {@link DelimitedSchema}
+ */
+ @NotNull
+ private String schema;
+ /**
+ * Schema is read into this object to access fields
+ */
+ private transient DelimitedSchema delimitedParserSchema;
+ /**
+ * Cell processors are an integral part of reading and writing with Super CSV
+ * they automate the data type conversions, and enforce constraints.
+ */
+ private transient CellProcessor[] processors;
+ /**
+ * Names of all the fields in the same order of incoming records
+ */
+ private transient String[] nameMapping;
+ /**
+ * header-this will be delimiter separated string of field names
+ */
+ private transient String header;
+ /**
+ * Reading preferences that are passed through schema
+ */
+ private transient CsvPreference preference;
+
+ private boolean initialized;
+
+ private static final Logger logger = LoggerFactory.getLogger(DelimitedFSLoader.class);
+
+ public DelimitedFSLoader()
+ {
+ }
+
+ /**
+ * Extracts the fields from a delimited record and returns a map containing
+ * field names and values
+ */
+ @Override
+ Map<String, Object> extractFields(String line)
+ {
+ if (!initialized) {
+ init();
+ initialized = true;
+ }
+ if (StringUtils.isBlank(line) || StringUtils.equals(line, header)) {
+ return null;
+ }
+ try {
+ csvStringReader.open(line);
+ return csvMapReader.read(nameMapping, processors);
+ } catch (IOException e) {
+ logger.error("Error parsing line{} Exception {}", line, e.getMessage());
+ return null;
+ }
+ }
+
+ private void init()
+ {
+
+ delimitedParserSchema = new DelimitedSchema(schema);
+ preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+ delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
+ nameMapping = delimitedParserSchema.getFieldNames()
+ .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
+ header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
+ processors = getProcessor(delimitedParserSchema.getFields());
+ csvStringReader = new ReusableStringReader();
+ csvMapReader = new CsvMapReader(csvStringReader, preference);
+ }
+
+ /**
+ * Returns array of cellprocessors, one for each field
+ */
+ private CellProcessor[] getProcessor(List<Field> fields)
+ {
+ CellProcessor[] processor = new CellProcessor[fields.size()];
+ int fieldCount = 0;
+ for (Field field : fields) {
+ processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
+ }
+ return processor;
+ }
+
+ /**
+ * Get the schema
+ *
+ * @return
+ */
+ public String getSchema()
+ {
+ return schema;
+ }
+
+ /**
+ * Set the schema
+ *
+ * @param schema
+ */
+ public void setSchema(String schema)
+ {
+ this.schema = schema;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
index 0c23c62..997243d 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
@@ -25,10 +25,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,30 +40,18 @@ import com.google.common.collect.Maps;
import com.datatorrent.lib.db.cache.CacheManager;
import com.datatorrent.lib.util.FieldInfo;
-
/**
- * This implementation of {@link BackendLoader} loads the data from a given file into memory cache and serves queries
- * from the cache.
- * When this is set as primaryCache in {@link CacheManager}, CacheManager can call {@link #loadInitialData()}
- * periodically to reload the file.
- * <p>
- * The format of the input file is:
- * <p>
- * {"productCategory": 5, "productId": 0}
- * {"productCategory": 4, "productId": 1}
- * {"productCategory": 5, "productId": 2}
- * {"productCategory": 5, "productId": 3}
- * </p>
- * Each line in the input file should be a valid json object which represents a record and each key/value pair in that
- * json object represents the fields/value.
- * <p>
- * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory because of
- * which the memory consumption may go up.
+ * This implementation of {@link BackendLoader} loads the data from a given file
+ * into memory cache and serves queries from the cache. When this is set as
+ * primaryCache in {@link CacheManager}, CacheManager can call
+ * {@link #loadInitialData()} periodically to reload the file. NOTE: This loader
+ * should be used with caution as all the data present in the file is loaded in
+ * memory because of which the memory consumption may go up.
*
* @since 3.4.0
*/
@InterfaceStability.Evolving
-public class FSLoader extends ReadOnlyBackup
+public abstract class FSLoader extends ReadOnlyBackup
{
@NotNull
private String fileName;
@@ -76,8 +60,6 @@ public class FSLoader extends ReadOnlyBackup
private transient FileSystem fs;
private transient boolean connected;
- private static final ObjectMapper mapper = new ObjectMapper();
- private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>(){});
private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
public String getFileName()
@@ -103,9 +85,11 @@ public class FSLoader extends ReadOnlyBackup
String line;
while ((line = bin.readLine()) != null) {
try {
- Map<String, Object> tuple = reader.readValue(line);
- result.put(getKey(tuple), getValue(tuple));
- } catch (JsonProcessingException parseExp) {
+ Map<String, Object> tuple = extractFields(line);
+ if (tuple != null && !tuple.isEmpty()) {
+ result.put(getKey(tuple), getValue(tuple));
+ }
+ } catch (Exception parseExp) {
logger.info("Unable to parse line {}", line);
}
}
@@ -128,6 +112,18 @@ public class FSLoader extends ReadOnlyBackup
return result;
}
+ /**
+ * This method is called by {@link #loadInitialData()} to extract values from
+ * a record. Concrete implementations override this method to parse a record
+ * and convert it to Map of field names and values OR simply returns null to
+ * skip the records.
+ *
+ * @param line
+ * A single record from file
+ * @return a map with field name and value. Null value if returned is ignored
+ */
+ abstract Map<String, Object> extractFields(String line);
+
private Object getValue(Map<String, Object> tuple)
{
ArrayList<Object> includeTuple = new ArrayList<Object>();
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java
new file mode 100644
index 0000000..a1d139a
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This implementation of {@link FSLoader} is used to load data from json file.
+ * <p>
+ * The input file needs to have one Json per line. E.g:
+ * <p>
+ * {"productCategory": 5, "productId": 0}
+ * {"productCategory": 4, "productId": 1}
+ * {"productCategory": 5, "productId": 2}
+ * {"productCategory": 5, "productId": 3}
+ * </p>
+ * Each line in the input file should be a valid json object which represents a
+ * record and each key/value pair in that json object represents the
+ * fields/value.
+ * <p>
+ *
+ */
+@InterfaceStability.Evolving
+public class JsonFSLoader extends FSLoader
+{
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+ private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>()
+ {
+ });
+
+ private static final Logger logger = LoggerFactory.getLogger(JsonFSLoader.class);
+
+ /**
+ * Extracts the fields from a json record and returns a map containing field
+ * names and values
+ */
+ @Override
+ Map<String, Object> extractFields(String line)
+ {
+ try {
+ return reader.readValue(line);
+ } catch (IOException e) {
+ logger.error("Exception while extracting fields {}", e);
+ }
+ return null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
index f24a13c..56f9c7f 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
@@ -21,7 +21,9 @@ package com.datatorrent.contrib.enrich;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.nio.charset.Charset;
import java.util.Arrays;
+import java.util.Date;
import java.util.Map;
import org.junit.Assert;
@@ -51,7 +53,7 @@ public class FileEnrichmentTest
FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
MapEnricher oper = new MapEnricher();
- FSLoader store = new FSLoader();
+ FSLoader store = new JsonFSLoader();
store.setFileName(fileUrl.toString());
oper.setLookupFields(Arrays.asList("productId"));
oper.setIncludeFields(Arrays.asList("productCategory"));
@@ -99,5 +101,69 @@ public class FileEnrichmentTest
Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
}
-}
+ @Test
+ public void testEnrichmentOperatorDelimitedFSLoader() throws IOException, InterruptedException
+ {
+ URL origUrl = this.getClass().getResource("/productmapping-delim.txt");
+
+ URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping-delim1.txt");
+ FileUtils.deleteQuietly(new File(fileUrl.getPath()));
+ FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
+ MapEnricher oper = new MapEnricher();
+ DelimitedFSLoader store = new DelimitedFSLoader();
+ // store.setFieldDescription("productCategory:INTEGER,productId:INTEGER");
+ store.setFileName(fileUrl.toString());
+ store.setSchema(
+ "{\"separator\":\",\",\"fields\": [{\"name\": \"productCategory\",\"type\": \"Integer\"},{\"name\": \"productId\",\"type\": \"Integer\"},{\"name\": \"mfgDate\",\"type\": \"Date\",\"constraints\": {\"format\": \"dd/MM/yyyy\"}}]}");
+ oper.setLookupFields(Arrays.asList("productId"));
+ oper.setIncludeFields(Arrays.asList("productCategory", "mfgDate"));
+ oper.setStore(store);
+
+ oper.setup(null);
+
+ CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+ oper.output.setSink(tmp);
+
+ oper.activate(null);
+
+ oper.beginWindow(0);
+ Map<String, Object> tuple = Maps.newHashMap();
+ tuple.put("productId", 3);
+ tuple.put("channelId", 4);
+ tuple.put("amount", 10.0);
+
+ Kryo kryo = new Kryo();
+ oper.input.process(kryo.copy(tuple));
+
+ oper.endWindow();
+
+ oper.deactivate();
+
+ /* Number of tuple, emitted */
+ Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+ Map<String, Object> emitted = sink.collectedTuples.iterator().next();
+
+ /* The fields present in original event is kept as it is */
+ Assert.assertEquals("Number of fields in emitted tuple", 5, emitted.size());
+ Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
+ Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
+ Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
+
+ /* Check if productCategory is added to the event */
+ Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
+ Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
+ Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
+
+ /* Check if mfgDate is added to the event */
+ Assert.assertEquals("mfgDate is part of tuple", true, emitted.containsKey("productCategory"));
+ Date mfgDate = (Date)emitted.get("mfgDate");
+ Assert.assertEquals("value of day", 1, mfgDate.getDate());
+ Assert.assertEquals("value of month", 0, mfgDate.getMonth());
+ Assert.assertEquals("value of year", 2016, mfgDate.getYear() + 1900);
+ Assert.assertTrue(emitted.get("mfgDate") instanceof Date);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/test/resources/productmapping-delim.txt
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/productmapping-delim.txt b/contrib/src/test/resources/productmapping-delim.txt
new file mode 100755
index 0000000..1a685cb
--- /dev/null
+++ b/contrib/src/test/resources/productmapping-delim.txt
@@ -0,0 +1,100 @@
+5,0,01/01/2016
+4,1,01/01/2016
+5,2,01/01/2016
+5,3,01/01/2016
+5,4,01/01/2016
+1,5,01/01/2016
+2,6,01/01/2016
+4,7,01/01/2016
+2,8,01/01/2016
+3,9,01/01/2016
+1,10,01/01/2016
+5,11,01/01/2016
+5,12,01/01/2016
+1,13,01/01/2016
+1,14,01/01/2016
+2,15,01/01/2016
+3,16,01/01/2016
+5,17,01/01/2016
+2,18,01/01/2016
+2,19,01/01/2016
+2,20,01/01/2016
+3,21,01/01/2016
+2,22,01/01/2016
+5,23,01/01/2016
+4,24,01/01/2016
+1,25,01/01/2016
+3,26,01/01/2016
+3,27,01/01/2016
+3,28,01/01/2016
+5,29,01/01/2016
+2,30,01/01/2016
+3,31,01/01/2016
+3,32,01/01/2016
+3,33,01/01/2016
+1,34,01/01/2016
+3,35,01/01/2016
+2,36,01/01/2016
+1,37,01/01/2016
+3,38,01/01/2016
+2,39,01/01/2016
+1,40,01/01/2016
+5,41,01/01/2016
+3,42,01/01/2016
+5,43,01/01/2016
+2,44,01/01/2016
+4,45,01/01/2016
+5,46,01/01/2016
+2,47,01/01/2016
+3,48,01/01/2016
+5,49,01/01/2016
+5,50,01/01/2016
+4,51,01/01/2016
+5,52,01/01/2016
+1,53,01/01/2016
+5,54,01/01/2016
+4,55,01/01/2016
+4,56,01/01/2016
+2,57,01/01/2016
+4,58,01/01/2016
+4,59,01/01/2016
+4,60,01/01/2016
+1,61,01/01/2016
+2,62,01/01/2016
+3,63,01/01/2016
+5,64,01/01/2016
+1,65,01/01/2016
+5,66,01/01/2016
+5,67,01/01/2016
+2,68,01/01/2016
+3,69,01/01/2016
+3,70,01/01/2016
+2,71,01/01/2016
+3,72,01/01/2016
+4,73,01/01/2016
+2,74,01/01/2016
+3,75,01/01/2016
+3,76,01/01/2016
+4,77,01/01/2016
+5,78,01/01/2016
+4,79,01/01/2016
+1,80,01/01/2016
+1,81,01/01/2016
+1,82,01/01/2016
+3,83,01/01/2016
+1,84,01/01/2016
+5,85,01/01/2016
+3,86,01/01/2016
+4,87,01/01/2016
+1,88,01/01/2016
+5,89,01/01/2016
+3,90,01/01/2016
+5,91,01/01/2016
+2,92,01/01/2016
+2,93,01/01/2016
+3,94,01/01/2016
+1,95,01/01/2016
+1,96,01/01/2016
+5,97,01/01/2016
+3,98,01/01/2016
+5,99,01/01/2016