You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/09/04 18:43:47 UTC
[1/2] incubator-streams git commit: implements STREAMS-363 moves line
format read/write out of streams-persist-hdfs to streams-converters this is
better as it can now be used across contrib and runtime modules
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-363 [created] 5bc81c3c3
implements STREAMS-363
moves line format read/write out of streams-persist-hdfs to streams-converters
this is better as it can now be used across contrib and runtime modules
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/30e3eafe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/30e3eafe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/30e3eafe
Branch: refs/heads/STREAMS-363
Commit: 30e3eafe6b9fa58f85ae744d40057eaa6d41871c
Parents: a8e1cc7
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Thu Sep 3 21:37:23 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Thu Sep 3 21:37:23 2015 -0500
----------------------------------------------------------------------
.../streams/converter/TypeConverterUtil.java | 2 +-
.../streams-persist-s3/pom.xml | 15 +++++
.../org/apache/streams/s3/S3PersistReader.java | 4 ++
.../apache/streams/s3/S3PersistReaderTask.java | 3 +-
.../org/apache/streams/s3/S3PersistWriter.java | 52 +++++---------
.../org/apache/streams/s3/S3Configuration.json | 20 ++++++
.../streams/s3/S3WriterConfiguration.json | 6 ++
streams-contrib/streams-persist-hdfs/pom.xml | 5 ++
.../streams/hdfs/WebHdfsPersistReader.java | 71 ++------------------
.../streams/hdfs/WebHdfsPersistReaderTask.java | 3 +-
.../streams/hdfs/WebHdfsPersistWriter.java | 66 +++---------------
11 files changed, 85 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
index 628a5a4..4ace9c4 100644
--- a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/TypeConverterUtil.java
@@ -32,7 +32,7 @@ import java.io.IOException;
*/
public class TypeConverterUtil {
- private final static Logger LOGGER = LoggerFactory.getLogger(ActivityConverterUtil.class);
+ private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
private static final TypeConverterUtil INSTANCE = new TypeConverterUtil();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
index cbfe1fd..ffc590e 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/pom.xml
@@ -61,6 +61,21 @@
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
+ <artifactId>streams-persist-hdfs</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
<artifactId>streams-util</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 5709f22..e89b086 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -31,6 +31,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
+import org.apache.streams.converter.LineReaderUtil;
import org.apache.streams.core.*;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -54,6 +55,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
private S3ReaderConfiguration s3ReaderConfiguration;
private AmazonS3Client amazonS3Client;
private ObjectMapper mapper = new ObjectMapper();
+ protected LineReaderUtil lineReaderUtil;
private Collection<String> files;
private ExecutorService executor;
protected volatile Queue<StreamsDatum> persistQueue;
@@ -100,6 +102,8 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
}
public void prepare(Object configurationObject) {
+
+ lineReaderUtil = LineReaderUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
// Connect to S3
synchronized (this)
{
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 73763e6..f2f5567 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -53,8 +53,7 @@ public class S3PersistReaderTask implements Runnable {
while((line = bufferedReader.readLine()) != null) {
if( !Strings.isNullOrEmpty(line) ) {
reader.countersCurrent.incrementAttempt();
- String[] fields = line.split(Character.toString(reader.DELIMITER));
- StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
+ StreamsDatum entry = reader.lineReaderUtil.processLine(line);
ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 09f16ff..778b386 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -29,7 +29,12 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineWriterUtil;
import org.apache.streams.core.*;
+import org.apache.streams.hdfs.WebHdfsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +56,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
private AmazonS3Client amazonS3Client;
private S3WriterConfiguration s3WriterConfiguration;
private final List<String> writtenFiles = new ArrayList<String>();
+ protected LineWriterUtil lineWriterUtil;
private final AtomicLong totalBytesWritten = new AtomicLong();
private AtomicLong bytesWrittenThisFile = new AtomicLong();
@@ -95,6 +101,14 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
this.objectMetaData = val;
}
+ public S3PersistWriter() {
+ this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3")));
+ }
+
+ public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
+ this.s3WriterConfiguration = s3WriterConfiguration;
+ }
+
/**
* Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
* @param amazonS3Client
@@ -107,10 +121,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
this.s3WriterConfiguration = s3WriterConfiguration;
}
- public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
- this.s3WriterConfiguration = s3WriterConfiguration;
- }
-
@Override
public void write(StreamsDatum streamsDatum) {
@@ -125,7 +135,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
}
}
- String line = convertResultToString(streamsDatum);
+ String line = lineWriterUtil.convertResultToString(streamsDatum);
try {
this.currentWriter.write(line);
@@ -145,7 +155,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
}
- private synchronized OutputStreamWriter resetFile() throws Exception {
+ public synchronized OutputStreamWriter resetFile() throws Exception {
// this will keep it thread safe, so we don't create too many files
if(this.fileLineCounter.get() == 0 && this.currentWriter != null)
return this.currentWriter;
@@ -218,36 +228,10 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
}
}
- private String convertResultToString(StreamsDatum entry)
- {
- String metadata = null;
-
- try {
- metadata = objectMapper.writeValueAsString(entry.getMetadata());
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
-
- String documentJson = null;
- try {
- documentJson = objectMapper.writeValueAsString(entry.getDocument());
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
+ public void prepare(Object configurationObject) {
- // Save the class name that it came from
- entry.metadata.put("class", entry.getDocument().getClass().getName());
+ lineWriterUtil = LineWriterUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
- if(Strings.isNullOrEmpty(documentJson))
- return null;
- else
- return entry.getId() + DELIMITER + // [0] = Unique id of the verbatim
- entry.getTimestamp() + DELIMITER + // [1] = Timestamp of the item
- metadata + DELIMITER + // [2] = Metadata of the item
- documentJson + "\n"; // [3] = The actual object
- }
-
- public void prepare(Object configurationObject) {
// Connect to S3
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
index ae696df..7f2e9e5 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
@@ -33,6 +33,26 @@
"type": "string",
"description": "The AWS region where your bucket resides",
"required": false
+ },
+ "fields": {
+ "type": "array",
+ "items": {
+ "type": "string"
+ },
+ "default": [
+ "ID",
+ "TS",
+ "META",
+ "DOC"
+ ]
+ },
+ "field_delimiter": {
+ "type": "string",
+ "default": "\t"
+ },
+ "line_delimiter": {
+ "type": "string",
+ "default": "\n"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
index 58d3055..b64a470 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3WriterConfiguration.json
@@ -26,6 +26,12 @@
"type": "boolean",
"default" : true,
"description": "Whether you want the file chunked inside of a folder or not"
+ },
+ "compression": {
+ "type": "string",
+ "description": "compression",
+ "enum" : ["none", "gzip"],
+ "default": "none"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 8d19141..b0b48cd 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -63,6 +63,11 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-converters</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hdfs.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 72f3157..983740d 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineWriterUtil;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
@@ -71,6 +73,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
protected volatile Queue<StreamsDatum> persistQueue;
protected ObjectMapper mapper;
+ protected LineReaderUtil lineReaderUtil;
protected HdfsReaderConfiguration hdfsConfiguration;
protected StreamsConfiguration streamsConfiguration;
@@ -163,6 +166,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
@Override
public void prepare(Object configurationObject) {
LOGGER.debug("Prepare");
+ lineReaderUtil = LineReaderUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
connectToWebHDFS();
String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
LOGGER.info("Path : {}", pathString);
@@ -231,73 +235,6 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
return current;
}
- public StreamsDatum processLine(String line) {
-
- List<String> expectedFields = hdfsConfiguration.getFields();
- String[] parsedFields = line.split(hdfsConfiguration.getFieldDelimiter());
-
- if( parsedFields.length == 0)
- return null;
-
- String id = null;
- DateTime ts = null;
- Map<String, Object> metadata = null;
- String json = null;
-
- if( expectedFields.contains( HdfsConstants.DOC )
- && parsedFields.length > expectedFields.indexOf(HdfsConstants.DOC)) {
- json = parsedFields[expectedFields.indexOf(HdfsConstants.DOC)];
- }
-
- if( expectedFields.contains( HdfsConstants.ID )
- && parsedFields.length > expectedFields.indexOf(HdfsConstants.ID)) {
- id = parsedFields[expectedFields.indexOf(HdfsConstants.ID)];
- }
- if( expectedFields.contains( HdfsConstants.TS )
- && parsedFields.length > expectedFields.indexOf(HdfsConstants.TS)) {
- ts = parseTs(parsedFields[expectedFields.indexOf(HdfsConstants.TS)]);
- }
- if( expectedFields.contains( HdfsConstants.META )
- && parsedFields.length > expectedFields.indexOf(HdfsConstants.META)) {
- metadata = parseMap(parsedFields[expectedFields.indexOf(HdfsConstants.META)]);
- }
-
- StreamsDatum datum = new StreamsDatum(json);
- datum.setId(id);
- datum.setTimestamp(ts);
- datum.setMetadata(metadata);
-
- return datum;
-
- }
-
- public DateTime parseTs(String field) {
-
- DateTime timestamp = null;
- try {
- long longts = Long.parseLong(field);
- timestamp = new DateTime(longts);
- } catch ( Exception e ) {}
- try {
- timestamp = mapper.readValue(field, DateTime.class);
- } catch ( Exception e ) {}
-
- return timestamp;
- }
-
- public Map<String, Object> parseMap(String field) {
-
- Map<String, Object> metadata = null;
-
- try {
- JsonNode jsonNode = mapper.readValue(field, JsonNode.class);
- metadata = mapper.convertValue(jsonNode, Map.class);
- } catch (IOException e) {
- LOGGER.warn("failed in parseMap: " + e.getMessage());
- }
- return metadata;
- }
-
protected void write( StreamsDatum entry ) {
boolean success;
do {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 9525131..d6b527d 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -23,6 +23,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.streams.converter.LineReaderUtil;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -83,7 +84,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
line = bufferedReader.readLine();
if( !Strings.isNullOrEmpty(line) ) {
reader.countersCurrent.incrementAttempt();
- StreamsDatum entry = reader.processLine(line);
+ StreamsDatum entry = reader.lineReaderUtil.processLine(line);
if( entry != null ) {
reader.write(entry);
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/30e3eafe/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 33598c2..50bb413 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -27,11 +27,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineWriterUtil;
import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,16 +53,13 @@ import java.util.Queue;
import java.util.zip.GZIPOutputStream;
public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
+
public final static String STREAMS_ID = "WebHdfsPersistWriter";
private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
- private final static char DELIMITER = '\t';
- private final static int DEFAULT_LINES_PER_FILE = 50000;
-
private FileSystem client;
private Path path;
- private String filePart = "default";
private int linesPerFile;
private int totalRecordsWritten = 0;
private final List<Path> writtenFiles = new ArrayList<Path>();
@@ -76,7 +75,8 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper;
+ private LineWriterUtil lineWriterUtil;
protected HdfsWriterConfiguration hdfsConfiguration;
@@ -160,7 +160,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
resetFile();
- String line = convertResultToString(streamsDatum);
+ String line = lineWriterUtil.convertResultToString(streamsDatum);
writeInternal(line);
int bytesInLine = line.getBytes().length;
@@ -261,58 +261,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
}
}
- public String convertResultToString(StreamsDatum entry) {
- String metadataJson = null;
- try {
- metadataJson = mapper.writeValueAsString(entry.getMetadata());
- } catch (JsonProcessingException e) {
- LOGGER.warn("Error converting metadata to a string", e);
- }
-
- String documentJson = null;
- try {
- if( entry.getDocument() instanceof String )
- documentJson = (String)entry.getDocument();
- else
- documentJson = mapper.writeValueAsString(entry.getDocument());
- } catch (JsonProcessingException e) {
- LOGGER.warn("Error converting document to string", e);
- }
-
- if (Strings.isNullOrEmpty(documentJson))
- return null;
- else {
- StringBuilder stringBuilder = new StringBuilder();
- Iterator<String> fields = hdfsConfiguration.getFields().iterator();
- List<String> fielddata = Lists.newArrayList();
- Joiner joiner = Joiner.on(hdfsConfiguration.getFieldDelimiter()).useForNull("");
- while( fields.hasNext() ) {
- String field = fields.next();
- if( field.equals(HdfsConstants.DOC) )
- fielddata.add(documentJson);
- else if( field.equals(HdfsConstants.ID) )
- fielddata.add(entry.getId());
- else if( field.equals(HdfsConstants.TS) )
- if( entry.getTimestamp() != null )
- fielddata.add(entry.getTimestamp().toString());
- else
- fielddata.add(DateTime.now().toString());
- else if( field.equals(HdfsConstants.META) )
- fielddata.add(metadataJson);
- else if( entry.getMetadata().containsKey(field)) {
- fielddata.add(entry.getMetadata().get(field).toString());
- } else {
- fielddata.add(null);
- }
-
- }
- joiner.appendTo(stringBuilder, fielddata);
- return stringBuilder.append(hdfsConfiguration.getLineDelimiter()).toString();
- }
- }
-
@Override
public void prepare(Object configurationObject) {
+ mapper = StreamsJacksonMapper.getInstance();
+ lineWriterUtil = LineWriterUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
connectToWebHDFS();
path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
}
[2/2] incubator-streams git commit: consolidated two Util classes to
one adds testing
Posted by sb...@apache.org.
consolidated two Util classes to one
adds testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5bc81c3c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5bc81c3c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5bc81c3c
Branch: refs/heads/STREAMS-363
Commit: 5bc81c3c379557370b1b65f56181d2e868925836
Parents: 30e3eaf
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Sep 4 11:43:42 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Sep 4 11:43:42 2015 -0500
----------------------------------------------------------------------
.../streams/converter/FieldConstants.java | 32 +++
.../streams/converter/LineReadWriteUtil.java | 227 +++++++++++++++++++
.../converter/test/TestLineReadWriteUtil.java | 112 +++++++++
.../org/apache/streams/s3/S3PersistReader.java | 6 +-
.../org/apache/streams/s3/S3PersistWriter.java | 7 +-
.../streams/hdfs/WebHdfsPersistReader.java | 7 +-
.../streams/hdfs/WebHdfsPersistReaderTask.java | 2 +-
.../streams/hdfs/WebHdfsPersistWriter.java | 7 +-
8 files changed, 384 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
new file mode 100644
index 0000000..26dfcb3
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/FieldConstants.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+/**
+ * Predefined field symbols
+ */
+public class FieldConstants {
+
+ protected static final String ID = "ID";
+ protected static final String SEQ = "SEQ";
+ protected static final String TS = "TS";
+ protected static final String META = "META";
+ protected static final String DOC = "DOC";
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
new file mode 100644
index 0000000..6ec1899
--- /dev/null
+++ b/streams-components/streams-converters/src/main/java/org/apache/streams/converter/LineReadWriteUtil.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Joiner;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * LineReadWriteUtil converts Datums to/from character array appropriate for writing to
+ * file systems.
+ */
+public class LineReadWriteUtil {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TypeConverterUtil.class);
+
+ private static final LineReadWriteUtil INSTANCE = new LineReadWriteUtil();
+
+ private final static List<String> DEFAULT_FIELDS = Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC");
+ private List<String> fields = DEFAULT_FIELDS;
+ private String fieldDelimiter = "\t";
+ private String lineDelimiter = "\n";
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private LineReadWriteUtil() {
+ }
+
+ private LineReadWriteUtil(List<String> fields) {
+ if( fields.size() > 0) this.fields = fields;
+ }
+
+ private LineReadWriteUtil(List<String> fields, String fieldDelimiter) {
+ this(fields);
+ if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+ }
+
+ private LineReadWriteUtil(List<String> fields, String fieldDelimiter, String lineDelimiter) {
+ this(fields);
+ if( fieldDelimiter != null ) this.fieldDelimiter = fieldDelimiter;
+ if( lineDelimiter != null ) this.lineDelimiter = lineDelimiter;
+ }
+
+ public static LineReadWriteUtil getInstance(){
+ return INSTANCE;
+ }
+
+ public static LineReadWriteUtil getInstance(List<String> fields){
+ return new LineReadWriteUtil(fields);
+ }
+
+ public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter){
+ return new LineReadWriteUtil(fields, fieldDelimiter);
+ }
+
+ public static LineReadWriteUtil getInstance(List<String> fields, String fieldDelimiter, String lineDelimiter){
+ return new LineReadWriteUtil(fields, fieldDelimiter, lineDelimiter);
+ }
+
+ public StreamsDatum processLine(String line) {
+
+ List<String> expectedFields = fields;
+ String[] parsedFields = line.split(fieldDelimiter);
+
+ if( parsedFields.length == 0)
+ return null;
+
+ String id = null;
+ DateTime ts = null;
+ BigInteger seq = null;
+ Map<String, Object> metadata = null;
+ String json = null;
+
+ if( expectedFields.contains( FieldConstants.DOC )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.DOC)) {
+ json = parsedFields[expectedFields.indexOf(FieldConstants.DOC)];
+ }
+
+ if( expectedFields.contains( FieldConstants.ID )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.ID)) {
+ id = parsedFields[expectedFields.indexOf(FieldConstants.ID)];
+ }
+ if( expectedFields.contains( FieldConstants.SEQ )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.SEQ)) {
+ seq = new BigInteger(parsedFields[expectedFields.indexOf(FieldConstants.SEQ)]);
+ }
+ if( expectedFields.contains( FieldConstants.TS )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.TS)) {
+ ts = parseTs(parsedFields[expectedFields.indexOf(FieldConstants.TS)]);
+ }
+ if( expectedFields.contains( FieldConstants.META )
+ && parsedFields.length > expectedFields.indexOf(FieldConstants.META)) {
+ metadata = parseMap(parsedFields[expectedFields.indexOf(FieldConstants.META)]);
+ }
+
+ StreamsDatum datum = new StreamsDatum(trimLineDelimiter(json));
+ datum.setId(id);
+ datum.setTimestamp(ts);
+ datum.setMetadata(metadata);
+ datum.setSequenceid(seq);
+ return datum;
+
+ }
+
+ public String convertResultToString(StreamsDatum entry) {
+ String metadataJson = null;
+ try {
+ metadataJson = MAPPER.writeValueAsString(entry.getMetadata());
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Error converting metadata to a string", e);
+ }
+
+ String documentJson = null;
+ try {
+ if( entry.getDocument() instanceof String )
+ documentJson = (String)entry.getDocument();
+ else
+ documentJson = MAPPER.writeValueAsString(entry.getDocument());
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("Error converting document to string", e);
+ }
+
+ if (Strings.isNullOrEmpty(documentJson))
+ return null;
+ else {
+ StringBuilder stringBuilder = new StringBuilder();
+ Iterator<String> fields = this.fields.iterator();
+ List<String> fielddata = Lists.newArrayList();
+ Joiner joiner = Joiner.on(fieldDelimiter).useForNull("");
+ while( fields.hasNext() ) {
+ String field = fields.next();
+ if( field.equals(FieldConstants.DOC) )
+ fielddata.add(documentJson);
+ else if( field.equals(FieldConstants.ID) )
+ fielddata.add(entry.getId());
+ else if( field.equals(FieldConstants.SEQ) )
+ fielddata.add(entry.getSequenceid().toString());
+ else if( field.equals(FieldConstants.TS) )
+ if( entry.getTimestamp() != null )
+ fielddata.add(entry.getTimestamp().toString());
+ else
+ fielddata.add(DateTime.now().toString());
+ else if( field.equals(FieldConstants.META) )
+ fielddata.add(metadataJson);
+ else if( entry.getMetadata().containsKey(field)) {
+ fielddata.add(entry.getMetadata().get(field).toString());
+ } else {
+ fielddata.add(null);
+ }
+
+ }
+ joiner.appendTo(stringBuilder, fielddata);
+ return stringBuilder.append(lineDelimiter).toString();
+ }
+ }
+
+ public DateTime parseTs(String field) {
+
+ DateTime timestamp = null;
+ try {
+ long longts = Long.parseLong(field);
+ timestamp = new DateTime(longts);
+ } catch ( Exception e ) {
+ try {
+ timestamp = DateTime.parse(field);
+ } catch ( Exception e2 ) {
+ try {
+ timestamp = MAPPER.readValue(field, DateTime.class);
+ } catch ( Exception e3 ) {
+ LOGGER.warn("Could not parse timestamp:{} ", field);
+ }
+ }
+ }
+
+ return timestamp;
+ }
+
+ public Map<String, Object> parseMap(String field) {
+
+ Map<String, Object> metadata = null;
+
+ try {
+ JsonNode jsonNode = MAPPER.readValue(field, JsonNode.class);
+ metadata = MAPPER.convertValue(jsonNode, Map.class);
+ } catch (IOException e) {
+ LOGGER.warn("failed in parseMap: " + e.getMessage());
+ }
+ return metadata;
+ }
+
+ private String trimLineDelimiter(String str) {
+ if( !Strings.isNullOrEmpty(str))
+ if( str.endsWith(lineDelimiter))
+ return str.substring(0,str.length()-1);
+ return str;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
----------------------------------------------------------------------
diff --git a/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
new file mode 100644
index 0000000..463b88d
--- /dev/null
+++ b/streams-components/streams-converters/src/test/java/org/apache/streams/converter/test/TestLineReadWriteUtil.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.converter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import junit.framework.Assert;
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.InputStream;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Tests for
+ * @see {@link LineReadWriteUtil}
+ */
+public class TestLineReadWriteUtil {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(TestLineReadWriteUtil.class);
+
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private static Random rand = new Random();
+
+ @Test
+ public void TestLineReadWrite () throws Exception {
+
+ List<List<String>> fieldArrays = Lists.newArrayList();
+ fieldArrays.add(new ArrayList<String>());
+ fieldArrays.add(Lists.newArrayList("ID"));
+ fieldArrays.add(Lists.newArrayList("DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "TS", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "TS", "META", "DOC"));
+ fieldArrays.add(Lists.newArrayList("ID", "SEQ", "TS", "META", "DOC"));
+
+ TestLineReadWriteCase(fieldArrays.get(0), null, null);
+ TestLineReadWriteCase(fieldArrays.get(1), "\t", null );
+ TestLineReadWriteCase(fieldArrays.get(2), "\t", "\n" );
+ TestLineReadWriteCase(fieldArrays.get(3), null, "\n" );
+ TestLineReadWriteCase(fieldArrays.get(4), "|", "\n" );
+ TestLineReadWriteCase(fieldArrays.get(5), "|", "\\0" );
+
+ }
+
+ public void TestLineReadWriteCase(List<String> fields, String fieldDelimiter, String lineDelimiter) throws Exception {
+
+ LineReadWriteUtil lineReadWriteUtil;
+ if( lineDelimiter != null && fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter, lineDelimiter);
+ else if( lineDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, null, lineDelimiter);
+ else if( fieldDelimiter != null ) lineReadWriteUtil = LineReadWriteUtil.getInstance(fields, fieldDelimiter);
+ lineReadWriteUtil = LineReadWriteUtil.getInstance(fields);
+
+ StreamsDatum testDatum = randomDatum();
+ String writeResult = lineReadWriteUtil.convertResultToString(testDatum);
+ assert !Strings.isNullOrEmpty(writeResult);
+ StreamsDatum readResult = lineReadWriteUtil.processLine(writeResult);
+ assert readResult != null;
+ assert !Strings.isNullOrEmpty(readResult.getId()) || !Strings.isNullOrEmpty((String)readResult.getDocument());
+ if( fields.size() == 5 )
+ Assert.assertEquals(testDatum.toString(), readResult.toString());
+
+ }
+
+ public static StreamsDatum randomDatum() {
+
+ StreamsDatum datum = new StreamsDatum(UUID.randomUUID().toString());
+ datum.setId(UUID.randomUUID().toString());
+ datum.setTimestamp(DateTime.now());
+ BigInteger result = new BigInteger(64, rand);
+ datum.setSequenceid(result);
+ Map<String,Object> metadata = Maps.newHashMap();
+ metadata.put("a", UUID.randomUUID().toString());
+ datum.setMetadata(metadata);
+ return datum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index e89b086..6efa150 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -31,7 +31,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -55,7 +55,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
private S3ReaderConfiguration s3ReaderConfiguration;
private AmazonS3Client amazonS3Client;
private ObjectMapper mapper = new ObjectMapper();
- protected LineReaderUtil lineReaderUtil;
+ protected LineReadWriteUtil lineReaderUtil;
private Collection<String> files;
private ExecutorService executor;
protected volatile Queue<StreamsDatum> persistQueue;
@@ -103,7 +103,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
public void prepare(Object configurationObject) {
- lineReaderUtil = LineReaderUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
+ lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
// Connect to S3
synchronized (this)
{
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 778b386..c5b041b 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -31,8 +31,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.apache.streams.hdfs.WebHdfsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -56,7 +55,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
private AmazonS3Client amazonS3Client;
private S3WriterConfiguration s3WriterConfiguration;
private final List<String> writtenFiles = new ArrayList<String>();
- protected LineWriterUtil lineWriterUtil;
+ protected LineReadWriteUtil lineWriterUtil;
private final AtomicLong totalBytesWritten = new AtomicLong();
private AtomicLong bytesWrittenThisFile = new AtomicLong();
@@ -230,7 +229,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
public void prepare(Object configurationObject) {
- lineWriterUtil = LineWriterUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
+ lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
// Connect to S3
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 983740d..0937bf2 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -33,8 +33,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
@@ -73,7 +72,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
protected volatile Queue<StreamsDatum> persistQueue;
protected ObjectMapper mapper;
- protected LineReaderUtil lineReaderUtil;
+ protected LineReadWriteUtil lineReaderUtil;
protected HdfsReaderConfiguration hdfsConfiguration;
protected StreamsConfiguration streamsConfiguration;
@@ -166,7 +165,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
@Override
public void prepare(Object configurationObject) {
LOGGER.debug("Prepare");
- lineReaderUtil = LineReaderUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+ lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
connectToWebHDFS();
String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
LOGGER.info("Path : {}", pathString);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index d6b527d..c5c1ffe 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -23,7 +23,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.streams.converter.LineReaderUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5bc81c3c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 50bb413..34df8ae 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReaderUtil;
-import org.apache.streams.converter.LineWriterUtil;
+import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
@@ -76,7 +75,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
protected volatile Queue<StreamsDatum> persistQueue;
private ObjectMapper mapper;
- private LineWriterUtil lineWriterUtil;
+ private LineReadWriteUtil lineWriterUtil;
protected HdfsWriterConfiguration hdfsConfiguration;
@@ -264,7 +263,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
@Override
public void prepare(Object configurationObject) {
mapper = StreamsJacksonMapper.getInstance();
- lineWriterUtil = LineWriterUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+ lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
connectToWebHDFS();
path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
}