You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/06/01 17:53:19 UTC
[03/46] incubator-streams git commit: refactor persist-pdfs and
persist-s3 for STREAMS-377
refactor persist-pdfs and persist-s3 for STREAMS-377
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/19f69e5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/19f69e5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/19f69e5a
Branch: refs/heads/STREAMS-389
Commit: 19f69e5ae37aead22f033d720f8cfb84ad20bb91
Parents: 1b2891d
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Sun Dec 13 23:27:05 2015 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Sun Dec 13 23:27:05 2015 -0600
----------------------------------------------------------------------
.../org/apache/streams/s3/S3PersistReader.java | 8 ++++--
.../org/apache/streams/s3/S3PersistWriter.java | 22 +++++++++++----
.../org/apache/streams/s3/S3Configuration.json | 23 ++--------------
.../streams/hdfs/WebHdfsPersistReader.java | 13 +++++----
.../streams/hdfs/WebHdfsPersistWriter.java | 15 +++++-----
.../apache/streams/hdfs/HdfsConfiguration.json | 29 +++-----------------
6 files changed, 44 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index b186288..702df71 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -32,7 +32,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,7 +112,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
public void prepare(Object configurationObject) {
- lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
+ lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
// Connect to S3
synchronized (this)
{
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index e426983..3686f55 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -25,21 +25,31 @@ import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
-import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
-import java.util.*;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -234,7 +244,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
public void prepare(Object configurationObject) {
- lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
+ lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration);
// Connect to S3
synchronized (this) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
index 7f2e9e5..403bfac 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
@@ -7,6 +7,9 @@
"type": "object",
"javaType" : "org.apache.streams.s3.S3Configuration",
"javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "../../../../../../../../../../streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json"
+ },
"properties": {
"key": {
"type": "string",
@@ -33,26 +36,6 @@
"type": "string",
"description": "The AWS region where your bucket resides",
"required": false
- },
- "fields": {
- "type": "array",
- "items": {
- "type": "string"
- },
- "default": [
- "ID",
- "TS",
- "META",
- "DOC"
- ]
- },
- "field_delimiter": {
- "type": "string",
- "default": "\t"
- },
- "line_delimiter": {
- "type": "string",
- "default": "\n"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index ffdeb4c..24c9737 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -18,7 +18,6 @@
package org.apache.streams.hdfs;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@@ -28,13 +27,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
@@ -46,9 +48,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
-import java.util.Map;
-import java.util.Queue;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -170,7 +171,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
@Override
public void prepare(Object configurationObject) {
LOGGER.debug("Prepare");
- lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+ lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
connectToWebHDFS();
String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
LOGGER.info("Path : {}", pathString);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 6ace93b..492eccb 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -18,11 +18,8 @@
package org.apache.streams.hdfs;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
@@ -31,9 +28,12 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +46,6 @@ import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
-import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.zip.GZIPOutputStream;
@@ -166,6 +165,8 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
String line = lineWriterUtil.convertResultToString(streamsDatum);
writeInternal(line);
+ if( !line.endsWith(this.hdfsConfiguration.getLineDelimiter()))
+ writeInternal(this.hdfsConfiguration.getLineDelimiter());
int bytesInLine = line.getBytes().length;
totalRecordsWritten++;
@@ -268,7 +269,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
@Override
public void prepare(Object configurationObject) {
mapper = StreamsJacksonMapper.getInstance();
- lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+ lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
connectToWebHDFS();
path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
index 61245c4..2853002 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
@@ -7,6 +7,9 @@
"type": "object",
"javaType" : "org.apache.streams.hdfs.HdfsConfiguration",
"javaInterfaces": ["java.io.Serializable"],
+ "extends": {
+ "$ref": "../../../../../../../../../streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json"
+ },
"properties": {
"scheme": {
"type": "string",
@@ -31,32 +34,8 @@
"description": "User"
},
"password": {
- "type": "string",
- "description": "Password"
- },
- "fields": {
- "type": "array",
- "items": {
- "type": "string"
- },
- "default": [
- "ID",
- "TS",
- "META",
- "DOC"
- ]
- },
- "field_delimiter": {
- "type": "string",
- "default": "\t"
- },
- "line_delimiter": {
- "type": "string",
- "default": "\n"
- },
- "encoding": {
"type": "string",
- "default": "UTF-8"
+ "description": "Password"
}
}
}
\ No newline at end of file