You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/09/04 18:43:47 UTC

[1/2] incubator-streams git commit: implements STREAMS-363 moves line format read/write out of streams-persist-hdfs to streams-converters this is better as it can now be used across contrib and runtime modules

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-363 [created] 5bc81c3c3


implements STREAMS-363
moves line format read/write out of streams-persist-hdfs to streams-converters
this is better as it can now be used across contrib and runtime modules


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/30e3eafe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/30e3eafe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/30e3eafe

Branch: refs/heads/STREAMS-363
Commit: 30e3eafe6b9fa58f85ae744d40057eaa6d41871c
Parents: a8e1cc7
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Sep 3 21:37:23 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Sep 3 21:37:23 2015 -0500

----------------------------------------------------------------------
 .../streams/converter/TypeConverterUtil.java    |  2 +-
 .../streams-persist-s3/pom.xml                  | 15 +++++
 .../org/apache/streams/s3/S3PersistReader.java  |  4 ++
 .../apache/streams/s3/S3PersistReaderTask.java  |  3 +-
 .../org/apache/streams/s3/S3PersistWriter.java  | 52 +++++---------
 .../org/apache/streams/s3/S3Configuration.json  | 20 ++++++
 .../streams/s3/S3WriterConfiguration.json       |  6 ++
 streams-contrib/streams-persist-hdfs/pom.xml    |  5 ++
 .../streams/hdfs/WebHdfsPersistReader.java      | 71 ++------------------
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |  3 +-
 .../streams/hdfs/WebHdfsPersistWriter.java      | 66 +++---------------
 11 files changed, 85 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
index 628a5a4..4ace9c4 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
@@ -32,7 +32,7 @@ import java.io.IOException;
  */
 public class TypeConverterUtil {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(ActivityConverterUtil.class);
+    private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
 
     private static final TypeConverterUtil INSTANCE = new TypeConverterUtil();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
index cbfe1fd..ffc590e 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
@@ -61,6 +61,21 @@
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
+            <artifactId>streams-persist-hdfs</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
             <artifactId>streams-util</artifactId>
             <version>${project.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 5709f22..e89b086 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -31,6 +31,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
+import org.apache.streams.converter.LineReaderUtil;
 import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -54,6 +55,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
     private S3ReaderConfiguration s3ReaderConfiguration;
     private AmazonS3Client amazonS3Client;
     private ObjectMapper mapper = new ObjectMapper();
+    protected LineReaderUtil lineReaderUtil;
     private Collection<String> files;
     private ExecutorService executor;
     protected volatile Queue<StreamsDatum> persistQueue;
@@ -100,6 +102,8 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
     }
 
     public void prepare(Object configurationObject) {
+
+        lineReaderUtil = LineReaderUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
         // Connect to S3
         synchronized (this)
         {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 73763e6..f2f5567 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -53,8 +53,7 @@ public class S3PersistReaderTask implements Runnable {
                 while((line = bufferedReader.readLine()) != null) {
                     if( !Strings.isNullOrEmpty(line) ) {
                         reader.countersCurrent.incrementAttempt();
-                        String[] fields = line.split(Character.toString(reader.DELIMITER));
-                        StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+                        StreamsDatum entry = reader.lineReaderUtil.processLine(line);
                         ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
                         reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 09f16ff..778b386 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -29,7 +29,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineWriterUtil;
 import org.apache.streams.core.*;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +56,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
     private AmazonS3Client amazonS3Client;
     private S3WriterConfiguration s3WriterConfiguration;
     private final List<String> writtenFiles = new ArrayList<String>();
+    protected LineWriterUtil lineWriterUtil;
 
     private final AtomicLong totalBytesWritten = new AtomicLong();
     private AtomicLong bytesWrittenThisFile = new AtomicLong();
@@ -95,6 +101,14 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
         this.objectMetaData = val;
     }
 
+    public S3PersistWriter() {
+        this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3")));
+    }
+
+    public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
+        this.s3WriterConfiguration = s3WriterConfiguration;
+    }
+
     /**
      * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
      * @param amazonS3Client
@@ -107,10 +121,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
         this.s3WriterConfiguration = s3WriterConfiguration;
     }
 
-    public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
-        this.s3WriterConfiguration = s3WriterConfiguration;
-    }
-
     @Override
     public void write(StreamsDatum streamsDatum) {
 
@@ -125,7 +135,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
                 }
             }
 
-            String line = convertResultToString(streamsDatum);
+            String line = lineWriterUtil.convertResultToString(streamsDatum);
 
             try {
                 this.currentWriter.write(line);
@@ -145,7 +155,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
 
     }
 
-    private synchronized OutputStreamWriter resetFile() throws Exception {
+    public synchronized OutputStreamWriter resetFile() throws Exception {
         // this will keep it thread safe, so we don't create too many files
         if(this.fileLineCounter.get() == 0 && this.currentWriter != null)
             return this.currentWriter;
@@ -218,36 +228,10 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
         }
     }
 
-    private String convertResultToString(StreamsDatum entry)
-    {
-        String metadata = null;
-
-        try {
-            metadata = objectMapper.writeValueAsString(entry.getMetadata());
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        }
-
-        String documentJson = null;
-        try {
-            documentJson = objectMapper.writeValueAsString(entry.getDocument());
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        }
+    public void prepare(Object configurationObject) {
 
-        // Save the class name that it came from
-        entry.metadata.put("class", entry.getDocument().getClass().getName());
+        lineWriterUtil = LineWriterUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
 
-        if(Strings.isNullOrEmpty(documentJson))
-            return null;
-        else
-            return  entry.getId()           + DELIMITER +   // [0] = Unique id of the verbatim
-                    entry.getTimestamp()    + DELIMITER +   // [1] = Timestamp of the item
-                    metadata                + DELIMITER +   // [2] = Metadata of the item
-                    documentJson            + "\n";         // [3] = The actual object
-    }
-
-    public void prepare(Object configurationObject) {
         // Connect to S3
         synchronized (this) {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
index ae696df..7f2e9e5 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
@@ -33,6 +33,26 @@
             "type": "string",
             "description": "The AWS region where your bucket resides",
             "required": false
+        },
+        "fields": {
+            "type": "array",
+            "items": {
+                "type": "string"
+            },
+            "default": [
+                "ID",
+                "TS",
+                "META",
+                "DOC"
+            ]
+        },
+        "field_delimiter": {
+            "type": "string",
+            "default": "\t"
+        },
+        "line_delimiter": {
+            "type": "string",
+            "default": "\n"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
index 58d3055..b64a470 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
@@ -26,6 +26,12 @@
             "type": "boolean",
             "default" : true,
             "description": "Whether you want the file chunked inside of a folder or not"
+        },
+        "compression": {
+            "type": "string",
+            "description": "compression",
+            "enum" : ["none", "gzip"],
+            "default": "none"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 8d19141..b0b48cd 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -63,6 +63,11 @@
             <optional>true</optional>
         </dependency>
         <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-converters</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
             <version>${hdfs.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 72f3157..983740d 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineWriterUtil;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
@@ -71,6 +73,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     protected volatile Queue<StreamsDatum> persistQueue;
 
     protected ObjectMapper mapper;
+    protected LineReaderUtil lineReaderUtil;
 
     protected HdfsReaderConfiguration hdfsConfiguration;
     protected StreamsConfiguration streamsConfiguration;
@@ -163,6 +166,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     @Override
     public void prepare(Object configurationObject) {
         LOGGER.debug("Prepare");
+        lineReaderUtil = LineReaderUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
         connectToWebHDFS();
         String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
         LOGGER.info("Path : {}", pathString);
@@ -231,73 +235,6 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
         return current;
     }
 
-    public StreamsDatum processLine(String line) {
-
-        List<String> expectedFields = hdfsConfiguration.getFields();
-        String[] parsedFields = line.split(hdfsConfiguration.getFieldDelimiter());
-
-        if( parsedFields.length == 0)
-            return null;
-
-        String id = null;
-        DateTime ts = null;
-        Map<String, Object> metadata = null;
-        String json = null;
-
-        if( expectedFields.contains( HdfsConstants.DOC )
-                && parsedFields.length > expectedFields.indexOf(HdfsConstants.DOC)) {
-            json = parsedFields[expectedFields.indexOf(HdfsConstants.DOC)];
-        }
-
-        if( expectedFields.contains( HdfsConstants.ID )
-                && parsedFields.length > expectedFields.indexOf(HdfsConstants.ID)) {
-            id = parsedFields[expectedFields.indexOf(HdfsConstants.ID)];
-        }
-        if( expectedFields.contains( HdfsConstants.TS )
-                && parsedFields.length > expectedFields.indexOf(HdfsConstants.TS)) {
-            ts = parseTs(parsedFields[expectedFields.indexOf(HdfsConstants.TS)]);
-        }
-        if( expectedFields.contains( HdfsConstants.META )
-                && parsedFields.length > expectedFields.indexOf(HdfsConstants.META)) {
-            metadata = parseMap(parsedFields[expectedFields.indexOf(HdfsConstants.META)]);
-        }
-
-        StreamsDatum datum = new StreamsDatum(json);
-        datum.setId(id);
-        datum.setTimestamp(ts);
-        datum.setMetadata(metadata);
-
-        return datum;
-
-    }
-
-    public DateTime parseTs(String field) {
-
-        DateTime timestamp = null;
-        try {
-            long longts = Long.parseLong(field);
-            timestamp = new DateTime(longts);
-        } catch ( Exception e ) {}
-        try {
-            timestamp = mapper.readValue(field, DateTime.class);
-        } catch ( Exception e ) {}
-
-        return timestamp;
-    }
-
-    public Map<String, Object> parseMap(String field) {
-
-        Map<String, Object> metadata = null;
-
-        try {
-            JsonNode jsonNode = mapper.readValue(field, JsonNode.class);
-            metadata = mapper.convertValue(jsonNode, Map.class);
-        } catch (IOException e) {
-            LOGGER.warn("failed in parseMap: " + e.getMessage());
-        }
-        return metadata;
-    }
-
     protected void write( StreamsDatum entry ) {
         boolean success;
         do {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 9525131..d6b527d 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -23,6 +23,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.streams.converter.LineReaderUtil;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -83,7 +84,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
                         line = bufferedReader.readLine();
                         if( !Strings.isNullOrEmpty(line) ) {
                             reader.countersCurrent.incrementAttempt();
-                            StreamsDatum entry = reader.processLine(line);
+                            StreamsDatum entry = reader.lineReaderUtil.processLine(line);
                             if( entry != null ) {
                                 reader.write(entry);
                                 reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 33598c2..50bb413 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -27,11 +27,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineWriterUtil;
 import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,16 +53,13 @@ import java.util.Queue;
 import java.util.zip.GZIPOutputStream;
 
 public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
+
     public final static String STREAMS_ID = "WebHdfsPersistWriter";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
 
-    private final static char DELIMITER = '\t';
-    private final static int DEFAULT_LINES_PER_FILE = 50000;
-
     private FileSystem client;
     private Path path;
-    private String filePart = "default";
     private int linesPerFile;
     private int totalRecordsWritten = 0;
     private final List<Path> writtenFiles = new ArrayList<Path>();
@@ -76,7 +75,8 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
 
     protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+    private ObjectMapper mapper;
+    private LineWriterUtil lineWriterUtil;
 
     protected HdfsWriterConfiguration hdfsConfiguration;
 
@@ -160,7 +160,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
             if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
                 resetFile();
 
-            String line = convertResultToString(streamsDatum);
+            String line = lineWriterUtil.convertResultToString(streamsDatum);
             writeInternal(line);
             int bytesInLine = line.getBytes().length;
 
@@ -261,58 +261,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         }
     }
 
-    public String convertResultToString(StreamsDatum entry) {
-        String metadataJson = null;
-        try {
-            metadataJson = mapper.writeValueAsString(entry.getMetadata());
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("Error converting metadata to a string", e);
-        }
-
-        String documentJson = null;
-        try {
-            if( entry.getDocument() instanceof String )
-                documentJson = (String)entry.getDocument();
-            else
-                documentJson = mapper.writeValueAsString(entry.getDocument());
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("Error converting document to string", e);
-        }
-
-        if (Strings.isNullOrEmpty(documentJson))
-            return null;
-        else {
-            StringBuilder stringBuilder = new StringBuilder();
-            Iterator<String> fields = hdfsConfiguration.getFields().iterator();
-            List<String> fielddata = Lists.newArrayList();
-            Joiner joiner = Joiner.on(hdfsConfiguration.getFieldDelimiter()).useForNull("");
-            while( fields.hasNext() ) {
-                String field = fields.next();
-                if( field.equals(HdfsConstants.DOC) )
-                    fielddata.add(documentJson);
-                else if( field.equals(HdfsConstants.ID) )
-                    fielddata.add(entry.getId());
-                else if( field.equals(HdfsConstants.TS) )
-                    if( entry.getTimestamp() != null )
-                        fielddata.add(entry.getTimestamp().toString());
-                    else
-                        fielddata.add(DateTime.now().toString());
-                else if( field.equals(HdfsConstants.META) )
-                    fielddata.add(metadataJson);
-                else if( entry.getMetadata().containsKey(field)) {
-                    fielddata.add(entry.getMetadata().get(field).toString());
-                } else {
-                    fielddata.add(null);
-                }
-
-            }
-            joiner.appendTo(stringBuilder, fielddata);
-            return stringBuilder.append(hdfsConfiguration.getLineDelimiter()).toString();
-        }
-    }
-
     @Override
     public void prepare(Object configurationObject) {
+        mapper = StreamsJacksonMapper.getInstance();
+        lineWriterUtil = LineWriterUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
         connectToWebHDFS();
         path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
     }


[2/2] incubator-streams git commit: consolidated two Util classes to one adds testing

Posted by sb...@apache.org.
consolidated two Util classes to one
adds testing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bc81c3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bc81c3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bc81c3c

Branch: refs/heads/STREAMS-363
Commit: 5bc81c3c379557370b1b65f56181d2e868925836
Parents: 30e3eaf
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Sep 4 11:43:42 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 11:43:42 2015 -0500

----------------------------------------------------------------------
 .../streams/converter/FieldConstants.java       |  32 +++
 .../streams/converter/LineReadWriteUtil.java    | 227 +++++++++++++++++++
 .../converter/test/TestLineReadWriteUtil.java   | 112 +++++++++
 .../org/apache/streams/s3/S3PersistReader.java  |   6 +-
 .../org/apache/streams/s3/S3PersistWriter.java  |   7 +-
 .../streams/hdfs/WebHdfsPersistReader.java      |   7 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |   2 +-
 .../streams/hdfs/WebHdfsPersistWriter.java      |   7 +-
 8 files changed, 384 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
new file mode 100644
index 0000000..26dfcb3
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+/**
+ * Predefined field symbols
+ */
+public class FieldConstants {
+
+    protected static final String ID = "ID";
+    protected static final String SEQ = "SEQ";
+    protected static final String TS = "TS";
+    protected static final String META = "META";
+    protected static final String DOC = "DOC";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
new file mode 100644
index 0000000..6ec1899
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * LineReadWriteUtil converts Datums to/from character array appropriate for writing to
+ * file systems.
+ */
+public class LineReadWriteUtil {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+
+    private static final LineReadWriteUtil INSTANCE = new LineReadWriteUtil();
+
+    private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+    private List<String> fields = DEFAULT_FIELDS;
+    private String fieldDelimiter = "\t";
+    private String lineDelimiter = "\n";
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    private LineReadWriteUtil() {
+    }
+
+    private LineReadWriteUtil(List<String> fields) {
+        if( fields.size() > 0) this.fields = fields;
+    }
+
+    private LineReadWriteUtil(List<String> fields, String fieldDelimiter) {
+        this(fields);
+        if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+    }
+
+    private LineReadWriteUtil(List<String> fields, String fieldDelimiter, String lineDelimiter) {
+        this(fields);
+        if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+        if( lineDelimiter != null ) this.lineDelimiter = lineDelimiter;
+    }
+
+    public static LineReadWriteUtil getInstance(){
+        return INSTANCE;
+    }
+
+    public static LineReadWriteUtil getInstance(List<String> fields){
+        return new LineReadWriteUtil(fields);
+    }
+
+    public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter){
+        return new LineReadWriteUtil(fields, fieldDelimiter);
+    }
+
+    public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter, String lineDelimiter){
+        return new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
+    }
+
+    public StreamsDatum processLine(String line) {
+
+        List<String> expectedFields = fields;
+        String[] parsedFields = line.split(fieldDelimiter);
+
+        if( parsedFields.length == 0)
+            return null;
+
+        String id = null;
+        DateTime ts = null;
+        BigInteger seq = null;
+        Map<String, Object> metadata = null;
+        String json = null;
+
+        if( expectedFields.contains( FieldConstants.DOC )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
+            json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
+        }
+
+        if( expectedFields.contains( FieldConstants.ID )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
+            id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
+        }
+        if( expectedFields.contains( FieldConstants.SEQ )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
+            seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
+        }
+        if( expectedFields.contains( FieldConstants.TS )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
+            ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
+        }
+        if( expectedFields.contains( FieldConstants.META )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
+            metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
+        }
+
+        StreamsDatum datum = new StreamsDatum(trimLineDelimiter(json));
+        datum.setId(id);
+        datum.setTimestamp(ts);
+        datum.setMetadata(metadata);
+        datum.setSequenceid(seq);
+        return datum;
+
+    }
+
+    public String convertResultToString(StreamsDatum entry) {
+        String metadataJson = null;
+        try {
+            metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
+        } catch (JsonProcessingException e) {
+            LOGGER.warn("Error converting metadata to a string", e);
+        }
+
+        String documentJson = null;
+        try {
+            if( entry.getDocument() instanceof String )
+                documentJson = (String)entry.getDocument();
+            else
+                documentJson = MAPPER.writeValueAsString(entry.getDocument());
+        } catch (JsonProcessingException e) {
+            LOGGER.warn("Error converting document to string", e);
+        }
+
+        if (Strings.isNullOrEmpty(documentJson))
+            return null;
+        else {
+            StringBuilder stringBuilder = new StringBuilder();
+            Iterator<String> fields = this.fields.iterator();
+            List<String> fielddata = Lists.newArrayList();
+            Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
+            while( fields.hasNext() ) {
+                String field = fields.next();
+                if( field.equals(FieldConstants.DOC) )
+                    fielddata.add(documentJson);
+                else if( field.equals(FieldConstants.ID) )
+                    fielddata.add(entry.getId());
+                else if( field.equals(FieldConstants.SEQ) )
+                    fielddata.add(entry.getSequenceid().toString());
+                else if( field.equals(FieldConstants.TS) )
+                    if( entry.getTimestamp() != null )
+                        fielddata.add(entry.getTimestamp().toString());
+                    else
+                        fielddata.add(DateTime.now().toString());
+                else if( field.equals(FieldConstants.META) )
+                    fielddata.add(metadataJson);
+                else if( entry.getMetadata().containsKey(field)) {
+                    fielddata.add(entry.getMetadata().get(field).toString());
+                } else {
+                    fielddata.add(null);
+                }
+
+            }
+            joiner.appendTo(stringBuilder, fielddata);
+            return stringBuilder.append(lineDelimiter).toString();
+        }
+    }
+
+    public DateTime parseTs(String field) {
+
+        DateTime timestamp = null;
+        try {
+            long longts = Long.parseLong(field);
+            timestamp = new DateTime(longts);
+        } catch ( Exception e ) {
+            try {
+                timestamp = DateTime.parse(field);
+            } catch ( Exception e2 ) {
+                try {
+                    timestamp = MAPPER.readValue(field, DateTime.class);
+                } catch ( Exception e3 ) {
+                    LOGGER.warn("Could not parse timestamp:{} ", field);
+                }
+            }
+        }
+
+        return timestamp;
+    }
+
+    public Map<String, Object> parseMap(String field) {
+
+        Map<String, Object> metadata = null;
+
+        try {
+            JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
+            metadata = MAPPER.convertValue(jsonNode, Map.class);
+        } catch (IOException e) {
+            LOGGER.warn("failed in parseMap: " + e.getMessage());
+        }
+        return metadata;
+    }
+
+    private String trimLineDelimiter(String str) {
+        if( !Strings.isNullOrEmpty(str))
+            if( str.endsWith(lineDelimiter))
+                return str.substring(0,str.length()-1);
+        return str;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
new file mode 100644
index 0000000..463b88d
--- /dev/null
+++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests for
+ * @see {@link LineReadWriteUtil}
+ */
+public class TestLineReadWriteUtil {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestLineReadWriteUtil.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    private static Random rand = new Random();
+
+    @Test
+    public void TestLineReadWrite () throws Exception {
+
+        List<List<String>> fieldArrays = Lists.newArrayList();
+        fieldArrays.add(new ArrayList<String>());
+        fieldArrays.add(Lists.newArrayList("ID"));
+        fieldArrays.add(Lists.newArrayList("DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"));
+
+        TestLineReadWriteCase(fieldArrays.get(0), null, null);
+        TestLineReadWriteCase(fieldArrays.get(1), "\t", null );
+        TestLineReadWriteCase(fieldArrays.get(2), "\t", "\n" );
+        TestLineReadWriteCase(fieldArrays.get(3), null, "\n" );
+        TestLineReadWriteCase(fieldArrays.get(4), "|", "\n" );
+        TestLineReadWriteCase(fieldArrays.get(5), "|", "\\0" );
+
+    }
+
+    public void TestLineReadWriteCase(List<String> fields, String fieldDelimiter, String lineDelimiter) throws Exception {
+
+        LineReadWriteUtil lineReadWriteUtil;
+        if( lineDelimiter != null && fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter, lineDelimiter);
+        else if( lineDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, null, lineDelimiter);
+        else if( fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter);
+        lineReadWriteUtil = LineReadWriteUtil.getInstance(fields);
+
+        StreamsDatum testDatum = randomDatum();
+        String writeResult = lineReadWriteUtil.convertResultToString(testDatum);
+        assert !Strings.isNullOrEmpty(writeResult);
+        StreamsDatum readResult = lineReadWriteUtil.processLine(writeResult);
+        assert readResult != null;
+        assert !Strings.isNullOrEmpty(readResult.getId()) || !Strings.isNullOrEmpty((String)readResult.getDocument());
+        if( fields.size() == 5 )
+            Assert.assertEquals(testDatum.toString(), readResult.toString());
+
+    }
+
+    public static StreamsDatum randomDatum() {
+
+        StreamsDatum datum = new StreamsDatum(UUID.randomUUID().toString());
+        datum.setId(UUID.randomUUID().toString());
+        datum.setTimestamp(DateTime.now());
+        BigInteger result = new BigInteger(64, rand);
+        datum.setSequenceid(result);
+        Map<String,Object> metadata = Maps.newHashMap();
+        metadata.put("a", UUID.randomUUID().toString());
+        datum.setMetadata(metadata);
+        return datum;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index e89b086..6efa150 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -31,7 +31,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
     private S3ReaderConfiguration s3ReaderConfiguration;
     private AmazonS3Client amazonS3Client;
     private ObjectMapper mapper = new ObjectMapper();
-    protected LineReaderUtil lineReaderUtil;
+    protected LineReadWriteUtil lineReaderUtil;
     private Collection<String> files;
     private ExecutorService executor;
     protected volatile Queue<StreamsDatum> persistQueue;
@@ -103,7 +103,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
 
     public void prepare(Object configurationObject) {
 
-        lineReaderUtil = LineReaderUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
+        lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
         // Connect to S3
         synchronized (this)
         {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 778b386..c5b041b 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -31,8 +31,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.apache.streams.hdfs.WebHdfsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -56,7 +55,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
     private AmazonS3Client amazonS3Client;
     private S3WriterConfiguration s3WriterConfiguration;
     private final List<String> writtenFiles = new ArrayList<String>();
-    protected LineWriterUtil lineWriterUtil;
+    protected LineReadWriteUtil lineWriterUtil;
 
     private final AtomicLong totalBytesWritten = new AtomicLong();
     private AtomicLong bytesWrittenThisFile = new AtomicLong();
@@ -230,7 +229,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
 
     public void prepare(Object configurationObject) {
 
-        lineWriterUtil = LineWriterUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
+        lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
 
         // Connect to S3
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 983740d..0937bf2 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
@@ -73,7 +72,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     protected volatile Queue<StreamsDatum> persistQueue;
 
     protected ObjectMapper mapper;
-    protected LineReaderUtil lineReaderUtil;
+    protected LineReadWriteUtil lineReaderUtil;
 
     protected HdfsReaderConfiguration hdfsConfiguration;
     protected StreamsConfiguration streamsConfiguration;
@@ -166,7 +165,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     @Override
     public void prepare(Object configurationObject) {
         LOGGER.debug("Prepare");
-        lineReaderUtil = LineReaderUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+        lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
         connectToWebHDFS();
         String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
         LOGGER.info("Path : {}", pathString);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index d6b527d..c5c1ffe 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 50bb413..34df8ae 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
@@ -76,7 +75,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     protected volatile Queue<StreamsDatum> persistQueue;
 
     private ObjectMapper mapper;
-    private LineWriterUtil lineWriterUtil;
+    private LineReadWriteUtil lineWriterUtil;
 
     protected HdfsWriterConfiguration hdfsConfiguration;
 
@@ -264,7 +263,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     @Override
     public void prepare(Object configurationObject) {
         mapper = StreamsJacksonMapper.getInstance();
-        lineWriterUtil = LineWriterUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+        lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
         connectToWebHDFS();
         path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
     }