You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/09/04 18:43:48 UTC
[2/2] incubator-streams git commit: consolidated two Util classes to
one adds testing
consolidated two Util classes to one
adds testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bc81c3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bc81c3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bc81c3c
Branch: refs/heads/STREAMS-363
Commit: 5bc81c3c379557370b1b65f56181d2e868925836
Parents: 30e3eaf
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Sep 4 11:43:42 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 11:43:42 2015 -0500
----------------------------------------------------------------------
.../streams/converter/FieldConstants.java | 32 +++
.../streams/converter/LineReadWriteUtil.java | 227 +++++++++++++++++++
.../converter/test/TestLineReadWriteUtil.java | 112 +++++++++
.../org/apache/streams/s3/S3PersistReader.java | 6 +-
.../org/apache/streams/s3/S3PersistWriter.java | 7 +-
.../streams/hdfs/WebHdfsPersistReader.java | 7 +-
.../streams/hdfs/WebHdfsPersistReaderTask.java | 2 +-
.../streams/hdfs/WebHdfsPersistWriter.java | 7 +-
8 files changed, 384 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
new file mode 100644
index 0000000..26dfcb3
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+/**
+ * Predefined field symbols
+ */
+public class FieldConstants {
+
+ protected static final String ID = "ID";
+ protected static final String SEQ = "SEQ";
+ protected static final String TS = "TS";
+ protected static final String META = "META";
+ protected static final String DOC = "DOC";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
new file mode 100644
index 0000000..6ec1899
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * LineReadWriteUtil converts Datums to/from character array appropriate for writing to
+ * file systems.
+ */
+public class LineReadWriteUtil {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+
+ private static final LineReadWriteUtil INSTANCE = new LineReadWriteUtil();
+
+ private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+ private List<String> fields = DEFAULT_FIELDS;
+ private String fieldDelimiter = "\t";
+ private String lineDelimiter = "\n";
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private LineReadWriteUtil() {
+ }
+
+ private LineReadWriteUtil(List<String> fields) {
+ if( fields.size() > 0) this.fields = fields;
+ }
+
+ private LineReadWriteUtil(List<String> fields, String fieldDelimiter) {
+ this(fields);
+ if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+ }
+
+ private LineReadWriteUtil(List<String> fields, String fieldDelimiter, String lineDelimiter) {
+ this(fields);
+ if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+ if( lineDelimiter != null ) this.lineDelimiter = lineDelimiter;
+ }
+
+ public static LineReadWriteUtil getInstance(){
+ return INSTANCE;
+ }
+
+ public static LineReadWriteUtil getInstance(List<String> fields){
+ return new LineReadWriteUtil(fields);
+ }
+
+ public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter){
+ return new LineReadWriteUtil(fields, fieldDelimiter);
+ }
+
+ public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter, String lineDelimiter){
+ return new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
+ }
+
+ public StreamsDatum processLine(String line) {
+
+ List<String> expectedFields = fields;
+ String[] parsedFields = line.split(fieldDelimiter);
+
+ if( parsedFields.length == 0)
+ return null;
+
+ String id = null;
+ DateTime ts = null;
+ BigInteger seq = null;
+ Map<String, Object> metadata = null;
+ String json = null;
+
+ if( expectedFields.contains( FieldConstants.DOC )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
+ json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
+ }
+
+ if( expectedFields.contains( FieldConstants.ID )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
+ id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
+ }
+ if( expectedFields.contains( FieldConstants.SEQ )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
+ seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
+ }
+ if( expectedFields.contains( FieldConstants.TS )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
+ ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
+ }
+ if( expectedFields.contains( FieldConstants.META )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
+ metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
+ }
+
+ StreamsDatum datum = new StreamsDatum(trimLineDelimiter(json));
+ datum.setId(id);
+ datum.setTimestamp(ts);
+ datum.setMetadata(metadata);
+ datum.setSequenceid(seq);
+ return datum;
+
+ }
+
+ public String convertResultToString(StreamsDatum entry) {
+ String metadataJson = null;
+ try {
+ metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Error converting metadata to a string", e);
+ }
+
+ String documentJson = null;
+ try {
+ if( entry.getDocument() instanceof String )
+ documentJson = (String)entry.getDocument();
+ else
+ documentJson = MAPPER.writeValueAsString(entry.getDocument());
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Error converting document to string", e);
+ }
+
+ if (Strings.isNullOrEmpty(documentJson))
+ return null;
+ else {
+ StringBuilder stringBuilder = new StringBuilder();
+ Iterator<String> fields = this.fields.iterator();
+ List<String> fielddata = Lists.newArrayList();
+ Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
+ while( fields.hasNext() ) {
+ String field = fields.next();
+ if( field.equals(FieldConstants.DOC) )
+ fielddata.add(documentJson);
+ else if( field.equals(FieldConstants.ID) )
+ fielddata.add(entry.getId());
+ else if( field.equals(FieldConstants.SEQ) )
+ fielddata.add(entry.getSequenceid().toString());
+ else if( field.equals(FieldConstants.TS) )
+ if( entry.getTimestamp() != null )
+ fielddata.add(entry.getTimestamp().toString());
+ else
+ fielddata.add(DateTime.now().toString());
+ else if( field.equals(FieldConstants.META) )
+ fielddata.add(metadataJson);
+ else if( entry.getMetadata().containsKey(field)) {
+ fielddata.add(entry.getMetadata().get(field).toString());
+ } else {
+ fielddata.add(null);
+ }
+
+ }
+ joiner.appendTo(stringBuilder, fielddata);
+ return stringBuilder.append(lineDelimiter).toString();
+ }
+ }
+
+ public DateTime parseTs(String field) {
+
+ DateTime timestamp = null;
+ try {
+ long longts = Long.parseLong(field);
+ timestamp = new DateTime(longts);
+ } catch ( Exception e ) {
+ try {
+ timestamp = DateTime.parse(field);
+ } catch ( Exception e2 ) {
+ try {
+ timestamp = MAPPER.readValue(field, DateTime.class);
+ } catch ( Exception e3 ) {
+ LOGGER.warn("Could not parse timestamp:{} ", field);
+ }
+ }
+ }
+
+ return timestamp;
+ }
+
+ public Map<String, Object> parseMap(String field) {
+
+ Map<String, Object> metadata = null;
+
+ try {
+ JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
+ metadata = MAPPER.convertValue(jsonNode, Map.class);
+ } catch (IOException e) {
+ LOGGER.warn("failed in parseMap: " + e.getMessage());
+ }
+ return metadata;
+ }
+
+ private String trimLineDelimiter(String str) {
+ if( !Strings.isNullOrEmpty(str))
+ if( str.endsWith(lineDelimiter))
+ return str.substring(0,str.length()-1);
+ return str;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
new file mode 100644
index 0000000..463b88d
--- /dev/null
+++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests for
+ * @see {@link LineReadWriteUtil}
+ */
+public class TestLineReadWriteUtil {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TestLineReadWriteUtil.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private static Random rand = new Random();
+
+ @Test
+ public void TestLineReadWrite () throws Exception {
+
+ List<List<String>> fieldArrays = Lists.newArrayList();
+ fieldArrays.add(new ArrayList<String>());
+ fieldArrays.add(Lists.newArrayList("ID"));
+ fieldArrays.add(Lists.newArrayList("DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"));
+
+ TestLineReadWriteCase(fieldArrays.get(0), null, null);
+ TestLineReadWriteCase(fieldArrays.get(1), "\t", null );
+ TestLineReadWriteCase(fieldArrays.get(2), "\t", "\n" );
+ TestLineReadWriteCase(fieldArrays.get(3), null, "\n" );
+ TestLineReadWriteCase(fieldArrays.get(4), "|", "\n" );
+ TestLineReadWriteCase(fieldArrays.get(5), "|", "\\0" );
+
+ }
+
+ public void TestLineReadWriteCase(List<String> fields, String fieldDelimiter, String lineDelimiter) throws Exception {
+
+ LineReadWriteUtil lineReadWriteUtil;
+ if( lineDelimiter != null && fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter, lineDelimiter);
+ else if( lineDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, null, lineDelimiter);
+ else if( fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter);
+ lineReadWriteUtil = LineReadWriteUtil.getInstance(fields);
+
+ StreamsDatum testDatum = randomDatum();
+ String writeResult = lineReadWriteUtil.convertResultToString(testDatum);
+ assert !Strings.isNullOrEmpty(writeResult);
+ StreamsDatum readResult = lineReadWriteUtil.processLine(writeResult);
+ assert readResult != null;
+ assert !Strings.isNullOrEmpty(readResult.getId()) || !Strings.isNullOrEmpty((String)readResult.getDocument());
+ if( fields.size() == 5 )
+ Assert.assertEquals(testDatum.toString(), readResult.toString());
+
+ }
+
+ public static StreamsDatum randomDatum() {
+
+ StreamsDatum datum = new StreamsDatum(UUID.randomUUID().toString());
+ datum.setId(UUID.randomUUID().toString());
+ datum.setTimestamp(DateTime.now());
+ BigInteger result = new BigInteger(64, rand);
+ datum.setSequenceid(result);
+ Map<String,Object> metadata = Maps.newHashMap();
+ metadata.put("a", UUID.randomUUID().toString());
+ datum.setMetadata(metadata);
+ return datum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index e89b086..6efa150 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -31,7 +31,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
private S3ReaderConfiguration s3ReaderConfiguration;
private AmazonS3Client amazonS3Client;
private ObjectMapper mapper = new ObjectMapper();
- protected LineReaderUtil lineReaderUtil;
+ protected LineReadWriteUtil lineReaderUtil;
private Collection<String> files;
private ExecutorService executor;
protected volatile Queue<StreamsDatum> persistQueue;
@@ -103,7 +103,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
public void prepare(Object configurationObject) {
- lineReaderUtil = LineReaderUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
+ lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
// Connect to S3
synchronized (this)
{
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 778b386..c5b041b 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -31,8 +31,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.apache.streams.hdfs.WebHdfsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -56,7 +55,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
private AmazonS3Client amazonS3Client;
private S3WriterConfiguration s3WriterConfiguration;
private final List<String> writtenFiles = new ArrayList<String>();
- protected LineWriterUtil lineWriterUtil;
+ protected LineReadWriteUtil lineWriterUtil;
private final AtomicLong totalBytesWritten = new AtomicLong();
private AtomicLong bytesWrittenThisFile = new AtomicLong();
@@ -230,7 +229,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
public void prepare(Object configurationObject) {
- lineWriterUtil = LineWriterUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
+ lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
// Connect to S3
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 983740d..0937bf2 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
@@ -73,7 +72,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
protected volatile Queue<StreamsDatum> persistQueue;
protected ObjectMapper mapper;
- protected LineReaderUtil lineReaderUtil;
+ protected LineReadWriteUtil lineReaderUtil;
protected HdfsReaderConfiguration hdfsConfiguration;
protected StreamsConfiguration streamsConfiguration;
@@ -166,7 +165,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
@Override
public void prepare(Object configurationObject) {
LOGGER.debug("Prepare");
- lineReaderUtil = LineReaderUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+ lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
connectToWebHDFS();
String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
LOGGER.info("Path : {}", pathString);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index d6b527d..c5c1ffe 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 50bb413..34df8ae 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
@@ -76,7 +75,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
protected volatile Queue<StreamsDatum> persistQueue;
private ObjectMapper mapper;
- private LineWriterUtil lineWriterUtil;
+ private LineReadWriteUtil lineWriterUtil;
protected HdfsWriterConfiguration hdfsConfiguration;
@@ -264,7 +263,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
@Override
public void prepare(Object configurationObject) {
mapper = StreamsJacksonMapper.getInstance();
- lineWriterUtil = LineWriterUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+ lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
connectToWebHDFS();
path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
}