You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/06/01 17:53:18 UTC
[02/46] incubator-streams git commit: resolves STREAMS-377
resolves STREAMS-377
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1b2891d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1b2891d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1b2891d4
Branch: refs/heads/STREAMS-389
Commit: 1b2891d43f428c8719e470554e8af8b8f674c7fe
Parents: c2229a4
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Sun Dec 13 14:24:06 2015 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Sun Dec 13 14:24:06 2015 -0600
----------------------------------------------------------------------
.../streams/converter/LineReadWriteUtil.java | 66 +++++++-------------
.../converter/LineReadWriteConfiguration.json | 36 +++++++++++
.../converter/test/TestLineReadWriteUtil.java | 58 ++++++++---------
3 files changed, 87 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
index d418b52..a38568b 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -24,13 +24,13 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
@@ -44,69 +44,45 @@ public class LineReadWriteUtil {
private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
- private static LineReadWriteUtil INSTANCE;
+ private static Map<LineReadWriteConfiguration, LineReadWriteUtil> INSTANCE_MAP = Maps.newConcurrentMap();
private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
private List<String> fields;
private String fieldDelimiter = "\t";
private String lineDelimiter = "\n";
+ private String encoding = "UTF-8";
private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
private LineReadWriteUtil() {
- this(LineReadWriteUtil.DEFAULT_FIELDS);
}
- private LineReadWriteUtil(List<String> fields) {
- if( fields != null && fields.size() > 0) this.fields = fields;
- else this.fields = LineReadWriteUtil.DEFAULT_FIELDS;
+ private LineReadWriteUtil(LineReadWriteConfiguration configuration) {
+ this.fields = configuration.getFields();
+ this.fieldDelimiter = configuration.getFieldDelimiter();
+ this.lineDelimiter = configuration.getLineDelimiter();
+ this.encoding = configuration.getEncoding();
}
- private LineReadWriteUtil(List<String> fields, String fieldDelimiter) {
- this(fields);
- if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+ public static LineReadWriteUtil getInstance() {
+ return getInstance(new LineReadWriteConfiguration());
}
- private LineReadWriteUtil(List<String> fields, String fieldDelimiter, String lineDelimiter) {
- this(fields);
- if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
- if( lineDelimiter != null ) this.lineDelimiter = lineDelimiter;
- }
-
- public static LineReadWriteUtil getInstance(){
- if( INSTANCE == null )
- INSTANCE = new LineReadWriteUtil(LineReadWriteUtil.DEFAULT_FIELDS);
- return INSTANCE;
- }
-
- public static LineReadWriteUtil getInstance(List<String> fields){
- if( INSTANCE == null )
- INSTANCE = new LineReadWriteUtil(fields);
- else if( !INSTANCE.fields.equals(fields))
- return new LineReadWriteUtil(fields);
- return INSTANCE;
- }
-
- public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter){
- if( INSTANCE == null )
- INSTANCE = new LineReadWriteUtil(fields, fieldDelimiter);
- else if( !INSTANCE.fields.equals(fields) || !INSTANCE.fieldDelimiter.equals(fieldDelimiter))
- return new LineReadWriteUtil(fields, fieldDelimiter);
- return INSTANCE;
- }
-
- public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter, String lineDelimiter){
- if( INSTANCE == null )
- INSTANCE = new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
- else if( !INSTANCE.fields.equals(fields) || !INSTANCE.fieldDelimiter.equals(fieldDelimiter) || !INSTANCE.fieldDelimiter.equals(lineDelimiter))
- return new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
- return INSTANCE;
+ public static LineReadWriteUtil getInstance(LineReadWriteConfiguration configuration) {
+ if( INSTANCE_MAP.containsKey(configuration) &&
+ INSTANCE_MAP.get(configuration) != null)
+ return INSTANCE_MAP.get(configuration);
+ else {
+ INSTANCE_MAP.put(configuration, new LineReadWriteUtil(configuration));
+ return INSTANCE_MAP.get(configuration);
+ }
}
public StreamsDatum processLine(String line) {
List<String> expectedFields = fields;
+ if( line.endsWith(lineDelimiter)) line = trimLineDelimiter(line);
String[] parsedFields = line.split(fieldDelimiter);
if( parsedFields.length == 0)
@@ -143,7 +119,7 @@ public class LineReadWriteUtil {
metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
}
- StreamsDatum datum = new StreamsDatum(trimLineDelimiter(json));
+ StreamsDatum datum = new StreamsDatum(json);
datum.setId(id);
datum.setTimestamp(ts);
datum.setMetadata(metadata);
@@ -203,7 +179,7 @@ public class LineReadWriteUtil {
}
joiner.appendTo(stringBuilder, fielddata);
- return stringBuilder.append(lineDelimiter).toString();
+ return stringBuilder.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json
new file mode 100644
index 0000000..13e8428
--- /dev/null
+++ b/streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json
@@ -0,0 +1,36 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.converter.LineReadWriteConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "fields": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "default": [
+ "ID",
+ "TS",
+ "META",
+ "DOC"
+ ]
+ },
+ "field_delimiter": {
+ "type": "string",
+ "default": "\t"
+ },
+ "line_delimiter": {
+ "type": "string",
+ "default": "\n"
+ },
+ "encoding": {
+ "type": "string",
+ "default": "UTF-8"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1b2891d4/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
index ed61203..955eef7 100644
--- a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
+++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
@@ -22,24 +22,16 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import junit.framework.Assert;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
+import org.apache.streams.converter.LineReadWriteConfiguration;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
import org.joda.time.DateTime;
-import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.InputStream;
import java.math.BigInteger;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -60,31 +52,41 @@ public class TestLineReadWriteUtil {
@Test
public void TestLineReadWrite () throws Exception {
- List<List<String>> fieldArrays = Lists.newArrayList();
- fieldArrays.add(Lists.newArrayList("ID"));
- fieldArrays.add(Lists.newArrayList("DOC"));
- fieldArrays.add(Lists.newArrayList("ID", "DOC"));
- fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
- fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
- fieldArrays.add(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"));
-
- TestLineReadWriteCase(fieldArrays.get(0), null, null);
- TestLineReadWriteCase(fieldArrays.get(1), "\t", null );
- TestLineReadWriteCase(fieldArrays.get(2), "\t", "\n" );
- TestLineReadWriteCase(fieldArrays.get(3), null, "\n" );
- TestLineReadWriteCase(fieldArrays.get(4), "|", "\n" );
- TestLineReadWriteCase(fieldArrays.get(5), "|", "\\0" );
+ List<LineReadWriteConfiguration> configs = Lists.newArrayList();
+ configs.add(new LineReadWriteConfiguration());
+ configs.add(new LineReadWriteConfiguration()
+ .withFields(Lists.newArrayList("ID")));
+ configs.add(new LineReadWriteConfiguration()
+ .withFields(Lists.newArrayList("DOC"))
+ .withFieldDelimiter("\t"));
+ configs.add(new LineReadWriteConfiguration()
+ .withFields(Lists.newArrayList("ID", "DOC"))
+ .withFieldDelimiter("\t")
+ .withLineDelimiter("\n"));
+ configs.add(new LineReadWriteConfiguration()
+ .withFields(Lists.newArrayList("ID", "TS", "DOC"))
+ .withLineDelimiter("\n"));
+ configs.add(new LineReadWriteConfiguration()
+ .withFields(Lists.newArrayList("ID", "TS", "META", "DOC"))
+ .withFieldDelimiter("|")
+ .withLineDelimiter("\n"));
+ configs.add(new LineReadWriteConfiguration()
+ .withFields(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"))
+ .withFieldDelimiter("|")
+ .withLineDelimiter("\\0"));
+
+ for(LineReadWriteConfiguration config : configs)
+ TestLineReadWriteCase(config);
}
- public void TestLineReadWriteCase(List<String> fields, String fieldDelimiter, String lineDelimiter) throws Exception {
+ public void TestLineReadWriteCase(LineReadWriteConfiguration lineReadWriteConfiguration) throws Exception {
LineReadWriteUtil lineReadWriteUtil;
- if( lineDelimiter != null && fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter, lineDelimiter);
- else if( lineDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, null, lineDelimiter);
- else if( fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter);
- else lineReadWriteUtil = LineReadWriteUtil.getInstance(fields);
+ lineReadWriteUtil = LineReadWriteUtil.getInstance(lineReadWriteConfiguration);
+
+ assert(lineReadWriteUtil != null);
StreamsDatum testDatum = randomDatum();
String writeResult = lineReadWriteUtil.convertResultToString(testDatum);
assert !Strings.isNullOrEmpty(writeResult);