You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/06/01 17:53:18 UTC

[02/46] incubator-streams git commit: resolves STREAMS-377

resolves STREAMS-377


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1b2891d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1b2891d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1b2891d4

Branch: refs/heads/STREAMS-389
Commit: 1b2891d43f428c8719e470554e8af8b8f674c7fe
Parents: c2229a4
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Sun Dec 13 14:24:06 2015 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Sun Dec 13 14:24:06 2015 -0600

----------------------------------------------------------------------
 .../streams/converter/LineReadWriteUtil.java    | 66 +++++++-------------
 .../converter/LineReadWriteConfiguration.json   | 36 +++++++++++
 .../converter/test/TestLineReadWriteUtil.java   | 58 ++++++++---------
 3 files changed, 87 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
index d418b52..a38568b 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -24,13 +24,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.List;
@@ -44,69 +44,45 @@ public class LineReadWriteUtil {
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
 
-    private static LineReadWriteUtil INSTANCE;
+    private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap();
 
     private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
 
     private List<String> fields;
     private String fieldDelimiter = "\t";
     private String lineDelimiter = "\n";
+    private String encoding = "UTF-8";
 
     private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
     private LineReadWriteUtil() {
-        this(LineReadWriteUtil.DEFAULT_FIELDS);
     }
 
-    private LineReadWriteUtil(List<String> fields) {
-        if( fields != null && fields.size() > 0) this.fields = fields;
-        else this.fields = LineReadWriteUtil.DEFAULT_FIELDS;
+    private LineReadWriteUtil(LineReadWriteConfiguration configuration) {
+        this.fields = configuration.getFields();
+        this.fieldDelimiter = configuration.getFieldDelimiter();
+        this.lineDelimiter = configuration.getLineDelimiter();
+        this.encoding = configuration.getEncoding();
     }
 
-    private LineReadWriteUtil(List<String> fields, String fieldDelimiter) {
-        this(fields);
-        if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+    public static LineReadWriteUtil getInstance() {
+        return getInstance(new LineReadWriteConfiguration());
     }
 
-    private LineReadWriteUtil(List<String> fields, String fieldDelimiter, String lineDelimiter) {
-        this(fields);
-        if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
-        if( lineDelimiter != null ) this.lineDelimiter = lineDelimiter;
-    }
-
-    public static LineReadWriteUtil getInstance(){
-        if( INSTANCE == null )
-            INSTANCE = new LineReadWriteUtil(LineReadWriteUtil.DEFAULT_FIELDS);
-        return INSTANCE;
-    }
-
-    public static LineReadWriteUtil getInstance(List<String> fields){
-        if( INSTANCE == null )
-            INSTANCE = new LineReadWriteUtil(fields);
-        else if( !INSTANCE.fields.equals(fields))
-            return new LineReadWriteUtil(fields);
-        return INSTANCE;
-    }
-
-    public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter){
-        if( INSTANCE == null )
-            INSTANCE = new LineReadWriteUtil(fields, fieldDelimiter);
-        else if( !INSTANCE.fields.equals(fields) || !INSTANCE.fieldDelimiter.equals(fieldDelimiter))
-            return new LineReadWriteUtil(fields, fieldDelimiter);
-        return INSTANCE;
-    }
-
-    public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter, String lineDelimiter){
-        if( INSTANCE == null )
-            INSTANCE = new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
-        else if( !INSTANCE.fields.equals(fields) || !INSTANCE.fieldDelimiter.equals(fieldDelimiter) || !INSTANCE.fieldDelimiter.equals(lineDelimiter))
-            return new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
-        return INSTANCE;
+    public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) {
+        if( INSTANCE_MAP.containsKey(configuration) &&
+            INSTANCE_MAP.get(configuration) != null)
+            return INSTANCE_MAP.get(configuration);
+        else {
+            INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration));
+            return INSTANCE_MAP.get(configuration);
+        }
     }
 
     public StreamsDatum processLine(String line) {
 
         List<String> expectedFields = fields;
+        if( line.endsWith(lineDelimiter)) line = trimLineDelimiter(line);
         String[] parsedFields = line.split(fieldDelimiter);
 
         if( parsedFields.length == 0)
@@ -143,7 +119,7 @@ public class LineReadWriteUtil {
             metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
         }
 
-        StreamsDatum datum = new StreamsDatum(trimLineDelimiter(json));
+        StreamsDatum datum = new StreamsDatum(json);
         datum.setId(id);
         datum.setTimestamp(ts);
         datum.setMetadata(metadata);
@@ -203,7 +179,7 @@ public class LineReadWriteUtil {
 
             }
             joiner.appendTo(stringBuilder, fielddata);
-            return stringBuilder.append(lineDelimiter).toString();
+            return stringBuilder.toString();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json
new file mode 100644
index 0000000..13e8428
--- /dev/null
+++ b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json
@@ -0,0 +1,36 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.converter.LineReadWriteConfiguration",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "fields": {
+      "type": "array",
+      "items": {
+        "type": "string"
+      },
+      "default": [
+        "ID",
+        "TS",
+        "META",
+        "DOC"
+      ]
+    },
+    "field_delimiter": {
+      "type": "string",
+      "default": "\t"
+    },
+    "line_delimiter": {
+      "type": "string",
+      "default": "\n"
+    },
+    "encoding": {
+      "type": "string",
+      "default": "UTF-8"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
index ed61203..955eef7 100644
--- a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
+++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
@@ -22,24 +22,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import junit.framework.Assert;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
+import org.apache.streams.converter.LineReadWriteConfiguration;
 import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
 import org.joda.time.DateTime;
-import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.InputStream;
 import java.math.BigInteger;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -60,31 +52,41 @@ public class TestLineReadWriteUtil {
     @Test
     public void TestLineReadWrite () throws Exception {
 
-        List<List<String>> fieldArrays = Lists.newArrayList();
-        fieldArrays.add(Lists.newArrayList("ID"));
-        fieldArrays.add(Lists.newArrayList("DOC"));
-        fieldArrays.add(Lists.newArrayList("ID", "DOC"));
-        fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
-        fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
-        fieldArrays.add(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"));
-
-        TestLineReadWriteCase(fieldArrays.get(0), null, null);
-        TestLineReadWriteCase(fieldArrays.get(1), "\t", null );
-        TestLineReadWriteCase(fieldArrays.get(2), "\t", "\n" );
-        TestLineReadWriteCase(fieldArrays.get(3), null, "\n" );
-        TestLineReadWriteCase(fieldArrays.get(4), "|", "\n" );
-        TestLineReadWriteCase(fieldArrays.get(5), "|", "\\0" );
+        List<LineReadWriteConfiguration> configs = Lists.newArrayList();
+        configs.add(new LineReadWriteConfiguration());
+        configs.add(new LineReadWriteConfiguration()
+                .withFields(Lists.newArrayList("ID")));
+        configs.add(new LineReadWriteConfiguration()
+                .withFields(Lists.newArrayList("DOC"))
+                .withFieldDelimiter("\t"));
+        configs.add(new LineReadWriteConfiguration()
+                .withFields(Lists.newArrayList("ID", "DOC"))
+                .withFieldDelimiter("\t")
+                .withLineDelimiter("\n"));
+        configs.add(new LineReadWriteConfiguration()
+                .withFields(Lists.newArrayList("ID", "TS", "DOC"))
+                .withLineDelimiter("\n"));
+        configs.add(new LineReadWriteConfiguration()
+                .withFields(Lists.newArrayList("ID", "TS", "META", "DOC"))
+                .withFieldDelimiter("|")
+                .withLineDelimiter("\n"));
+        configs.add(new LineReadWriteConfiguration()
+                .withFields(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"))
+                .withFieldDelimiter("|")
+                .withLineDelimiter("\\0"));
+
+        for(LineReadWriteConfiguration config : configs)
+            TestLineReadWriteCase(config);
 
     }
 
-    public void TestLineReadWriteCase(List<String> fields, String fieldDelimiter, String lineDelimiter) throws Exception {
+    public void TestLineReadWriteCase(LineReadWriteConfiguration lineReadWriteConfiguration) throws Exception {
 
         LineReadWriteUtil lineReadWriteUtil;
-        if( lineDelimiter != null && fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter, lineDelimiter);
-        else if( lineDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, null, lineDelimiter);
-        else if( fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter);
-        else lineReadWriteUtil = LineReadWriteUtil.getInstance(fields);
 
+        lineReadWriteUtil = LineReadWriteUtil.getInstance(lineReadWriteConfiguration);
+
+        assert(lineReadWriteUtil != null);
         StreamsDatum testDatum = randomDatum();
         String writeResult = lineReadWriteUtil.convertResultToString(testDatum);
         assert !Strings.isNullOrEmpty(writeResult);