You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ch...@apache.org on 2016/08/19 08:27:44 UTC

apex-malhar git commit: APEXMALHAR-2151 delimited file format support to FSLoader

Repository: apex-malhar
Updated Branches:
  refs/heads/master 9b6e11d85 -> 571db6c06


APEXMALHAR-2151 delimited file format support to FSLoader


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/571db6c0
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/571db6c0
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/571db6c0

Branch: refs/heads/master
Commit: 571db6c063a0cd34d9379148f3eada85b16a0760
Parents: 9b6e11d
Author: shubham <sh...@github.com>
Authored: Wed Aug 10 14:13:10 2016 +0530
Committer: shubham <sh...@github.com>
Committed: Thu Aug 18 14:05:30 2016 +0530

----------------------------------------------------------------------
 .../contrib/enrich/DelimitedFSLoader.java       | 165 +++++++++++++++++++
 .../datatorrent/contrib/enrich/FSLoader.java    |  52 +++---
 .../contrib/enrich/JsonFSLoader.java            |  74 +++++++++
 .../contrib/enrich/FileEnrichmentTest.java      |  70 +++++++-
 .../src/test/resources/productmapping-delim.txt | 100 +++++++++++
 5 files changed, 431 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
new file mode 100644
index 0000000..25f283c
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.supercsv.cellprocessor.ift.CellProcessor;
+import org.supercsv.io.CsvMapReader;
+import org.supercsv.prefs.CsvPreference;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.contrib.parser.CellProcessorBuilder;
+import com.datatorrent.contrib.parser.DelimitedSchema;
+import com.datatorrent.contrib.parser.DelimitedSchema.Field;
+import com.datatorrent.lib.util.ReusableStringReader;
+
+/**
+ * This implementation of {@link FSLoader} is used to load data from delimited
+ * file.User needs to provide a schema as a string specified in a json format as
+ * per {@link DelimitedSchema} that contains information of name and type of
+ * field
+ */
+@InterfaceStability.Evolving
+public class DelimitedFSLoader extends FSLoader
+{
+
+  /**
+   * Map Reader to read delimited records
+   */
+  private transient CsvMapReader csvMapReader;
+  /**
+   * Reader used by csvMapReader
+   */
+  private transient ReusableStringReader csvStringReader;
+  /**
+   * Contents of the schema.Schema is specified in a json format as per
+   * {@link DelimitedSchema}
+   */
+  @NotNull
+  private String schema;
+  /**
+   * Schema is read into this object to access fields
+   */
+  private transient DelimitedSchema delimitedParserSchema;
+  /**
+   * Cell processors are an integral part of reading and writing with Super CSV
+   * they automate the data type conversions, and enforce constraints.
+   */
+  private transient CellProcessor[] processors;
+  /**
+   * Names of all the fields in the same order of incoming records
+   */
+  private transient String[] nameMapping;
+  /**
+   * header-this will be delimiter separated string of field names
+   */
+  private transient String header;
+  /**
+   * Reading preferences that are passed through schema
+   */
+  private transient CsvPreference preference;
+
+  private boolean initialized;
+
+  private static final Logger logger = LoggerFactory.getLogger(DelimitedFSLoader.class);
+
+  public DelimitedFSLoader()
+  {
+  }
+
+  /**
+   * Extracts the fields from a delimited record and returns a map containing
+   * field names and values
+   */
+  @Override
+  Map<String, Object> extractFields(String line)
+  {
+    if (!initialized) {
+      init();
+      initialized = true;
+    }
+    if (StringUtils.isBlank(line) || StringUtils.equals(line, header)) {
+      return null;
+    }
+    try {
+      csvStringReader.open(line);
+      return csvMapReader.read(nameMapping, processors);
+    } catch (IOException e) {
+      logger.error("Error parsing line{} Exception {}", line, e.getMessage());
+      return null;
+    }
+  }
+
+  private void init()
+  {
+
+    delimitedParserSchema = new DelimitedSchema(schema);
+    preference = new CsvPreference.Builder(delimitedParserSchema.getQuoteChar(),
+        delimitedParserSchema.getDelimiterChar(), delimitedParserSchema.getLineDelimiter()).build();
+    nameMapping = delimitedParserSchema.getFieldNames()
+        .toArray(new String[delimitedParserSchema.getFieldNames().size()]);
+    header = StringUtils.join(nameMapping, (char)delimitedParserSchema.getDelimiterChar() + "");
+    processors = getProcessor(delimitedParserSchema.getFields());
+    csvStringReader = new ReusableStringReader();
+    csvMapReader = new CsvMapReader(csvStringReader, preference);
+  }
+
+  /**
+   * Returns array of cellprocessors, one for each field
+   */
+  private CellProcessor[] getProcessor(List<Field> fields)
+  {
+    CellProcessor[] processor = new CellProcessor[fields.size()];
+    int fieldCount = 0;
+    for (Field field : fields) {
+      processor[fieldCount++] = CellProcessorBuilder.getCellProcessor(field.getType(), field.getConstraints());
+    }
+    return processor;
+  }
+
+  /**
+   * Get the schema
+   * 
+   * @return
+   */
+  public String getSchema()
+  {
+    return schema;
+  }
+
+  /**
+   * Set the schema
+   * 
+   * @param schema
+   */
+  public void setSchema(String schema)
+  {
+    this.schema = schema;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
index 0c23c62..997243d 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
@@ -25,10 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.codehaus.jackson.JsonProcessingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,30 +40,18 @@ import com.google.common.collect.Maps;
 import com.datatorrent.lib.db.cache.CacheManager;
 import com.datatorrent.lib.util.FieldInfo;
 
-
 /**
- * This implementation of {@link BackendLoader} loads the data from a given file into memory cache and serves queries
- * from the cache.
- * When this is set as primaryCache in {@link CacheManager}, CacheManager can call {@link #loadInitialData()}
- * periodically to reload the file.
- * <p>
- * The format of the input file is:
- * <p>
- * {"productCategory": 5, "productId": 0}
- * {"productCategory": 4, "productId": 1}
- * {"productCategory": 5, "productId": 2}
- * {"productCategory": 5, "productId": 3}
- * </p>
- * Each line in the input file should be a valid json object which represents a record and each key/value pair in that
- * json object represents the fields/value.
- * <p>
- * NOTE: This loader should be used with caution as all the data present in the file is loaded in memory because of
- * which the memory consumption may go up.
+ * This implementation of {@link BackendLoader} loads the data from a given file
+ * into memory cache and serves queries from the cache. When this is set as
+ * primaryCache in {@link CacheManager}, CacheManager can call
+ * {@link #loadInitialData()} periodically to reload the file. NOTE: This loader
+ * should be used with caution as all the data present in the file is loaded in
+ * memory because of which the memory consumption may go up.
  *
  * @since 3.4.0
  */
 @InterfaceStability.Evolving
-public class FSLoader extends ReadOnlyBackup
+public abstract class FSLoader extends ReadOnlyBackup
 {
   @NotNull
   private String fileName;
@@ -76,8 +60,6 @@ public class FSLoader extends ReadOnlyBackup
   private transient FileSystem fs;
   private transient boolean connected;
 
-  private static final ObjectMapper mapper = new ObjectMapper();
-  private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>(){});
   private static final Logger logger = LoggerFactory.getLogger(FSLoader.class);
 
   public String getFileName()
@@ -103,9 +85,11 @@ public class FSLoader extends ReadOnlyBackup
       String line;
       while ((line = bin.readLine()) != null) {
         try {
-          Map<String, Object> tuple = reader.readValue(line);
-          result.put(getKey(tuple), getValue(tuple));
-        } catch (JsonProcessingException parseExp) {
+          Map<String, Object> tuple = extractFields(line);
+          if (tuple != null && !tuple.isEmpty()) {
+            result.put(getKey(tuple), getValue(tuple));
+          }
+        } catch (Exception parseExp) {
           logger.info("Unable to parse line {}", line);
         }
       }
@@ -128,6 +112,18 @@ public class FSLoader extends ReadOnlyBackup
     return result;
   }
 
+  /**
+   * This method is called by {@link #loadInitialData()} to extract values from
+   * a record. Concrete implementations override this method to parse a record
+   * and convert it to Map of field names and values OR simply returns null to
+   * skip the records.
+   * 
+   * @param line
+   *          A single record from file
+   * @return a map with field name and value. Null value if returned is ignored
+   */
+  abstract Map<String, Object> extractFields(String line);
+
   private Object getValue(Map<String, Object> tuple)
   {
     ArrayList<Object> includeTuple = new ArrayList<Object>();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java
new file mode 100644
index 0000000..a1d139a
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/JsonFSLoader.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.enrich;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This implementation of {@link FSLoader} is used to load data from json file.
+ * <p>
+ * The input file needs to have one Json per line. E.g:
+ * <p>
+ * {"productCategory": 5, "productId": 0}
+ * {"productCategory": 4, "productId": 1}
+ * {"productCategory": 5, "productId": 2}
+ * {"productCategory": 5, "productId": 3}
+ * </p>
+ * Each line in the input file should be a valid json object which represents a
+ * record and each key/value pair in that json object represents the
+ * fields/value.
+ * <p>
+ *
+ */
+@InterfaceStability.Evolving
+public class JsonFSLoader extends FSLoader
+{
+
+  private static final ObjectMapper mapper = new ObjectMapper();
+  private static final ObjectReader reader = mapper.reader(new TypeReference<Map<String, Object>>()
+  {
+  });
+
+  private static final Logger logger = LoggerFactory.getLogger(JsonFSLoader.class);
+
+  /**
+   * Extracts the fields from a json record and returns a map containing field
+   * names and values
+   */
+  @Override
+  Map<String, Object> extractFields(String line)
+  {
+    try {
+      return reader.readValue(line);
+    } catch (IOException e) {
+      logger.error("Exception while extracting fields {}", e);
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
index f24a13c..56f9c7f 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/enrich/FileEnrichmentTest.java
@@ -21,7 +21,9 @@ package com.datatorrent.contrib.enrich;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.Map;
 
 import org.junit.Assert;
@@ -51,7 +53,7 @@ public class FileEnrichmentTest
     FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
 
     MapEnricher oper = new MapEnricher();
-    FSLoader store = new FSLoader();
+    FSLoader store = new JsonFSLoader();
     store.setFileName(fileUrl.toString());
     oper.setLookupFields(Arrays.asList("productId"));
     oper.setIncludeFields(Arrays.asList("productCategory"));
@@ -99,5 +101,69 @@ public class FileEnrichmentTest
     Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
     Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
   }
-}
 
+  @Test
+  public void testEnrichmentOperatorDelimitedFSLoader() throws IOException, InterruptedException
+  {
+    URL origUrl = this.getClass().getResource("/productmapping-delim.txt");
+
+    URL fileUrl = new URL(this.getClass().getResource("/").toString() + "productmapping-delim1.txt");
+    FileUtils.deleteQuietly(new File(fileUrl.getPath()));
+    FileUtils.copyFile(new File(origUrl.getPath()), new File(fileUrl.getPath()));
+    MapEnricher oper = new MapEnricher();
+    DelimitedFSLoader store = new DelimitedFSLoader();
+    // store.setFieldDescription("productCategory:INTEGER,productId:INTEGER");
+    store.setFileName(fileUrl.toString());
+    store.setSchema(
+        "{\"separator\":\",\",\"fields\": [{\"name\": \"productCategory\",\"type\": \"Integer\"},{\"name\": \"productId\",\"type\": \"Integer\"},{\"name\": \"mfgDate\",\"type\": \"Date\",\"constraints\": {\"format\": \"dd/MM/yyyy\"}}]}");
+    oper.setLookupFields(Arrays.asList("productId"));
+    oper.setIncludeFields(Arrays.asList("productCategory", "mfgDate"));
+    oper.setStore(store);
+
+    oper.setup(null);
+
+    CollectorTestSink<Map<String, Object>> sink = new CollectorTestSink<>();
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    CollectorTestSink<Object> tmp = (CollectorTestSink)sink;
+    oper.output.setSink(tmp);
+
+    oper.activate(null);
+
+    oper.beginWindow(0);
+    Map<String, Object> tuple = Maps.newHashMap();
+    tuple.put("productId", 3);
+    tuple.put("channelId", 4);
+    tuple.put("amount", 10.0);
+
+    Kryo kryo = new Kryo();
+    oper.input.process(kryo.copy(tuple));
+
+    oper.endWindow();
+
+    oper.deactivate();
+
+    /* Number of tuple, emitted */
+    Assert.assertEquals("Number of tuple emitted ", 1, sink.collectedTuples.size());
+    Map<String, Object> emitted = sink.collectedTuples.iterator().next();
+
+    /* The fields present in original event is kept as it is */
+    Assert.assertEquals("Number of fields in emitted tuple", 5, emitted.size());
+    Assert.assertEquals("value of productId is 3", tuple.get("productId"), emitted.get("productId"));
+    Assert.assertEquals("value of channelId is 4", tuple.get("channelId"), emitted.get("channelId"));
+    Assert.assertEquals("value of amount is 10.0", tuple.get("amount"), emitted.get("amount"));
+
+    /* Check if productCategory is added to the event */
+    Assert.assertEquals("productCategory is part of tuple", true, emitted.containsKey("productCategory"));
+    Assert.assertEquals("value of product category is 1", 5, emitted.get("productCategory"));
+    Assert.assertTrue(emitted.get("productCategory") instanceof Integer);
+
+    /* Check if mfgDate is added to the event */
+    Assert.assertEquals("mfgDate is part of tuple", true, emitted.containsKey("productCategory"));
+    Date mfgDate = (Date)emitted.get("mfgDate");
+    Assert.assertEquals("value of day", 1, mfgDate.getDate());
+    Assert.assertEquals("value of month", 0, mfgDate.getMonth());
+    Assert.assertEquals("value of year", 2016, mfgDate.getYear() + 1900);
+    Assert.assertTrue(emitted.get("mfgDate") instanceof Date);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/571db6c0/contrib/src/test/resources/productmapping-delim.txt
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/productmapping-delim.txt b/contrib/src/test/resources/productmapping-delim.txt
new file mode 100755
index 0000000..1a685cb
--- /dev/null
+++ b/contrib/src/test/resources/productmapping-delim.txt
@@ -0,0 +1,100 @@
+5,0,01/01/2016
+4,1,01/01/2016
+5,2,01/01/2016
+5,3,01/01/2016
+5,4,01/01/2016
+1,5,01/01/2016
+2,6,01/01/2016
+4,7,01/01/2016
+2,8,01/01/2016
+3,9,01/01/2016
+1,10,01/01/2016
+5,11,01/01/2016
+5,12,01/01/2016
+1,13,01/01/2016
+1,14,01/01/2016
+2,15,01/01/2016
+3,16,01/01/2016
+5,17,01/01/2016
+2,18,01/01/2016
+2,19,01/01/2016
+2,20,01/01/2016
+3,21,01/01/2016
+2,22,01/01/2016
+5,23,01/01/2016
+4,24,01/01/2016
+1,25,01/01/2016
+3,26,01/01/2016
+3,27,01/01/2016
+3,28,01/01/2016
+5,29,01/01/2016
+2,30,01/01/2016
+3,31,01/01/2016
+3,32,01/01/2016
+3,33,01/01/2016
+1,34,01/01/2016
+3,35,01/01/2016
+2,36,01/01/2016
+1,37,01/01/2016
+3,38,01/01/2016
+2,39,01/01/2016
+1,40,01/01/2016
+5,41,01/01/2016
+3,42,01/01/2016
+5,43,01/01/2016
+2,44,01/01/2016
+4,45,01/01/2016
+5,46,01/01/2016
+2,47,01/01/2016
+3,48,01/01/2016
+5,49,01/01/2016
+5,50,01/01/2016
+4,51,01/01/2016
+5,52,01/01/2016
+1,53,01/01/2016
+5,54,01/01/2016
+4,55,01/01/2016
+4,56,01/01/2016
+2,57,01/01/2016
+4,58,01/01/2016
+4,59,01/01/2016
+4,60,01/01/2016
+1,61,01/01/2016
+2,62,01/01/2016
+3,63,01/01/2016
+5,64,01/01/2016
+1,65,01/01/2016
+5,66,01/01/2016
+5,67,01/01/2016
+2,68,01/01/2016
+3,69,01/01/2016
+3,70,01/01/2016
+2,71,01/01/2016
+3,72,01/01/2016
+4,73,01/01/2016
+2,74,01/01/2016
+3,75,01/01/2016
+3,76,01/01/2016
+4,77,01/01/2016
+5,78,01/01/2016
+4,79,01/01/2016
+1,80,01/01/2016
+1,81,01/01/2016
+1,82,01/01/2016
+3,83,01/01/2016
+1,84,01/01/2016
+5,85,01/01/2016
+3,86,01/01/2016
+4,87,01/01/2016
+1,88,01/01/2016
+5,89,01/01/2016
+3,90,01/01/2016
+5,91,01/01/2016
+2,92,01/01/2016
+2,93,01/01/2016
+3,94,01/01/2016
+1,95,01/01/2016
+1,96,01/01/2016
+5,97,01/01/2016
+3,98,01/01/2016
+5,99,01/01/2016