You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/09/04 18:43:48 UTC

[2/2] incubator-streams git commit: consolidated two Util classes to one adds testing

consolidated two Util classes to one
adds testing


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bc81c3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bc81c3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bc81c3c

Branch: refs/heads/STREAMS-363
Commit: 5bc81c3c379557370b1b65f56181d2e868925836
Parents: 30e3eaf
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Sep 4 11:43:42 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 11:43:42 2015 -0500

----------------------------------------------------------------------
 .../streams/converter/FieldConstants.java       |  32 +++
 .../streams/converter/LineReadWriteUtil.java    | 227 +++++++++++++++++++
 .../converter/test/TestLineReadWriteUtil.java   | 112 +++++++++
 .../org/apache/streams/s3/S3PersistReader.java  |   6 +-
 .../org/apache/streams/s3/S3PersistWriter.java  |   7 +-
 .../streams/hdfs/WebHdfsPersistReader.java      |   7 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |   2 +-
 .../streams/hdfs/WebHdfsPersistWriter.java      |   7 +-
 8 files changed, 384 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
new file mode 100644
index 0000000..26dfcb3
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+/**
+ * Predefined field symbols
+ */
+public class FieldConstants {
+
+    protected static final String ID = "ID";
+    protected static final String SEQ = "SEQ";
+    protected static final String TS = "TS";
+    protected static final String META = "META";
+    protected static final String DOC = "DOC";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
new file mode 100644
index 0000000..6ec1899
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * LineReadWriteUtil converts Datums to/from character array appropriate for writing to
+ * file systems.
+ */
+public class LineReadWriteUtil {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+
+    private static final LineReadWriteUtil INSTANCE = new LineReadWriteUtil();
+
+    private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+    private List<String> fields = DEFAULT_FIELDS;
+    private String fieldDelimiter = "\t";
+    private String lineDelimiter = "\n";
+
+    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    private LineReadWriteUtil() {
+    }
+
+    private LineReadWriteUtil(List<String> fields) {
+        if( fields.size() > 0) this.fields = fields;
+    }
+
+    private LineReadWriteUtil(List<String> fields, String fieldDelimiter) {
+        this(fields);
+        if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+    }
+
+    private LineReadWriteUtil(List<String> fields, String fieldDelimiter, String lineDelimiter) {
+        this(fields);
+        if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+        if( lineDelimiter != null ) this.lineDelimiter = lineDelimiter;
+    }
+
+    public static LineReadWriteUtil getInstance(){
+        return INSTANCE;
+    }
+
+    public static LineReadWriteUtil getInstance(List<String> fields){
+        return new LineReadWriteUtil(fields);
+    }
+
+    public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter){
+        return new LineReadWriteUtil(fields, fieldDelimiter);
+    }
+
+    public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter, String lineDelimiter){
+        return new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
+    }
+
+    public StreamsDatum processLine(String line) {
+
+        List<String> expectedFields = fields;
+        String[] parsedFields = line.split(fieldDelimiter);
+
+        if( parsedFields.length == 0)
+            return null;
+
+        String id = null;
+        DateTime ts = null;
+        BigInteger seq = null;
+        Map<String, Object> metadata = null;
+        String json = null;
+
+        if( expectedFields.contains( FieldConstants.DOC )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
+            json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
+        }
+
+        if( expectedFields.contains( FieldConstants.ID )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
+            id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
+        }
+        if( expectedFields.contains( FieldConstants.SEQ )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
+            seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
+        }
+        if( expectedFields.contains( FieldConstants.TS )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
+            ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
+        }
+        if( expectedFields.contains( FieldConstants.META )
+                && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
+            metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
+        }
+
+        StreamsDatum datum = new StreamsDatum(trimLineDelimiter(json));
+        datum.setId(id);
+        datum.setTimestamp(ts);
+        datum.setMetadata(metadata);
+        datum.setSequenceid(seq);
+        return datum;
+
+    }
+
+    public String convertResultToString(StreamsDatum entry) {
+        String metadataJson = null;
+        try {
+            metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
+        } catch (JsonProcessingException e) {
+            LOGGER.warn("Error converting metadata to a string", e);
+        }
+
+        String documentJson = null;
+        try {
+            if( entry.getDocument() instanceof String )
+                documentJson = (String)entry.getDocument();
+            else
+                documentJson = MAPPER.writeValueAsString(entry.getDocument());
+        } catch (JsonProcessingException e) {
+            LOGGER.warn("Error converting document to string", e);
+        }
+
+        if (Strings.isNullOrEmpty(documentJson))
+            return null;
+        else {
+            StringBuilder stringBuilder = new StringBuilder();
+            Iterator<String> fields = this.fields.iterator();
+            List<String> fielddata = Lists.newArrayList();
+            Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
+            while( fields.hasNext() ) {
+                String field = fields.next();
+                if( field.equals(FieldConstants.DOC) )
+                    fielddata.add(documentJson);
+                else if( field.equals(FieldConstants.ID) )
+                    fielddata.add(entry.getId());
+                else if( field.equals(FieldConstants.SEQ) )
+                    fielddata.add(entry.getSequenceid().toString());
+                else if( field.equals(FieldConstants.TS) )
+                    if( entry.getTimestamp() != null )
+                        fielddata.add(entry.getTimestamp().toString());
+                    else
+                        fielddata.add(DateTime.now().toString());
+                else if( field.equals(FieldConstants.META) )
+                    fielddata.add(metadataJson);
+                else if( entry.getMetadata().containsKey(field)) {
+                    fielddata.add(entry.getMetadata().get(field).toString());
+                } else {
+                    fielddata.add(null);
+                }
+
+            }
+            joiner.appendTo(stringBuilder, fielddata);
+            return stringBuilder.append(lineDelimiter).toString();
+        }
+    }
+
+    public DateTime parseTs(String field) {
+
+        DateTime timestamp = null;
+        try {
+            long longts = Long.parseLong(field);
+            timestamp = new DateTime(longts);
+        } catch ( Exception e ) {
+            try {
+                timestamp = DateTime.parse(field);
+            } catch ( Exception e2 ) {
+                try {
+                    timestamp = MAPPER.readValue(field, DateTime.class);
+                } catch ( Exception e3 ) {
+                    LOGGER.warn("Could not parse timestamp:{} ", field);
+                }
+            }
+        }
+
+        return timestamp;
+    }
+
+    public Map<String, Object> parseMap(String field) {
+
+        Map<String, Object> metadata = null;
+
+        try {
+            JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
+            metadata = MAPPER.convertValue(jsonNode, Map.class);
+        } catch (IOException e) {
+            LOGGER.warn("failed in parseMap: " + e.getMessage());
+        }
+        return metadata;
+    }
+
+    private String trimLineDelimiter(String str) {
+        if( !Strings.isNullOrEmpty(str))
+            if( str.endsWith(lineDelimiter))
+                return str.substring(0,str.length()-1);
+        return str;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
new file mode 100644
index 0000000..463b88d
--- /dev/null
+++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests for
+ * @see {@link LineReadWriteUtil}
+ */
+public class TestLineReadWriteUtil {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(TestLineReadWriteUtil.class);
+
+    ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+    private static Random rand = new Random();
+
+    @Test
+    public void TestLineReadWrite () throws Exception {
+
+        List<List<String>> fieldArrays = Lists.newArrayList();
+        fieldArrays.add(new ArrayList<String>());
+        fieldArrays.add(Lists.newArrayList("ID"));
+        fieldArrays.add(Lists.newArrayList("DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
+        fieldArrays.add(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"));
+
+        TestLineReadWriteCase(fieldArrays.get(0), null, null);
+        TestLineReadWriteCase(fieldArrays.get(1), "\t", null );
+        TestLineReadWriteCase(fieldArrays.get(2), "\t", "\n" );
+        TestLineReadWriteCase(fieldArrays.get(3), null, "\n" );
+        TestLineReadWriteCase(fieldArrays.get(4), "|", "\n" );
+        TestLineReadWriteCase(fieldArrays.get(5), "|", "\\0" );
+
+    }
+
+    public void TestLineReadWriteCase(List<String> fields, String fieldDelimiter, String lineDelimiter) throws Exception {
+
+        LineReadWriteUtil lineReadWriteUtil;
+        if( lineDelimiter != null && fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter, lineDelimiter);
+        else if( lineDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, null, lineDelimiter);
+        else if( fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter);
+        lineReadWriteUtil = LineReadWriteUtil.getInstance(fields);
+
+        StreamsDatum testDatum = randomDatum();
+        String writeResult = lineReadWriteUtil.convertResultToString(testDatum);
+        assert !Strings.isNullOrEmpty(writeResult);
+        StreamsDatum readResult = lineReadWriteUtil.processLine(writeResult);
+        assert readResult != null;
+        assert !Strings.isNullOrEmpty(readResult.getId()) || !Strings.isNullOrEmpty((String)readResult.getDocument());
+        if( fields.size() == 5 )
+            Assert.assertEquals(testDatum.toString(), readResult.toString());
+
+    }
+
+    public static StreamsDatum randomDatum() {
+
+        StreamsDatum datum = new StreamsDatum(UUID.randomUUID().toString());
+        datum.setId(UUID.randomUUID().toString());
+        datum.setTimestamp(DateTime.now());
+        BigInteger result = new BigInteger(64, rand);
+        datum.setSequenceid(result);
+        Map<String,Object> metadata = Maps.newHashMap();
+        metadata.put("a", UUID.randomUUID().toString());
+        datum.setMetadata(metadata);
+        return datum;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index e89b086..6efa150 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -31,7 +31,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
     private S3ReaderConfiguration s3ReaderConfiguration;
     private AmazonS3Client amazonS3Client;
     private ObjectMapper mapper = new ObjectMapper();
-    protected LineReaderUtil lineReaderUtil;
+    protected LineReadWriteUtil lineReaderUtil;
     private Collection<String> files;
     private ExecutorService executor;
     protected volatile Queue<StreamsDatum> persistQueue;
@@ -103,7 +103,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
 
     public void prepare(Object configurationObject) {
 
-        lineReaderUtil = LineReaderUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
+        lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
         // Connect to S3
         synchronized (this)
         {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 778b386..c5b041b 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -31,8 +31,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.apache.streams.hdfs.WebHdfsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -56,7 +55,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
     private AmazonS3Client amazonS3Client;
     private S3WriterConfiguration s3WriterConfiguration;
     private final List<String> writtenFiles = new ArrayList<String>();
-    protected LineWriterUtil lineWriterUtil;
+    protected LineReadWriteUtil lineWriterUtil;
 
     private final AtomicLong totalBytesWritten = new AtomicLong();
     private AtomicLong bytesWrittenThisFile = new AtomicLong();
@@ -230,7 +229,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
 
     public void prepare(Object configurationObject) {
 
-        lineWriterUtil = LineWriterUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
+        lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
 
         // Connect to S3
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 983740d..0937bf2 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
@@ -73,7 +72,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     protected volatile Queue<StreamsDatum> persistQueue;
 
     protected ObjectMapper mapper;
-    protected LineReaderUtil lineReaderUtil;
+    protected LineReadWriteUtil lineReaderUtil;
 
     protected HdfsReaderConfiguration hdfsConfiguration;
     protected StreamsConfiguration streamsConfiguration;
@@ -166,7 +165,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     @Override
     public void prepare(Object configurationObject) {
         LOGGER.debug("Prepare");
-        lineReaderUtil = LineReaderUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+        lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
         connectToWebHDFS();
         String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
         LOGGER.info("Path : {}", pathString);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index d6b527d..c5c1ffe 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 50bb413..34df8ae 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
@@ -76,7 +75,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     protected volatile Queue<StreamsDatum> persistQueue;
 
     private ObjectMapper mapper;
-    private LineWriterUtil lineWriterUtil;
+    private LineReadWriteUtil lineWriterUtil;
 
     protected HdfsWriterConfiguration hdfsConfiguration;
 
@@ -264,7 +263,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     @Override
     public void prepare(Object configurationObject) {
         mapper = StreamsJacksonMapper.getInstance();
-        lineWriterUtil = LineWriterUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+        lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
         connectToWebHDFS();
         path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
     }