You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/06/01 17:53:19 UTC

[03/46] incubator-streams git commit: refactor persist-pdfs and persist-s3 for STREAMS-377

refactor persist-pdfs and persist-s3 for STREAMS-377


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/19f69e5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/19f69e5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/19f69e5a

Branch: refs/heads/STREAMS-389
Commit: 19f69e5ae37aead22f033d720f8cfb84ad20bb91
Parents: 1b2891d
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Sun Dec 13 23:27:05 2015 -0600
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Sun Dec 13 23:27:05 2015 -0600

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3PersistReader.java  |  8 ++++--
 .../org/apache/streams/s3/S3PersistWriter.java  | 22 +++++++++++----
 .../org/apache/streams/s3/S3Configuration.json  | 23 ++--------------
 .../streams/hdfs/WebHdfsPersistReader.java      | 13 +++++----
 .../streams/hdfs/WebHdfsPersistWriter.java      | 15 +++++-----
 .../apache/streams/hdfs/HdfsConfiguration.json  | 29 +++-----------------
 6 files changed, 44 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index b186288..702df71 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -32,7 +32,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
 import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -108,7 +112,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
 
     public void prepare(Object configurationObject) {
 
-        lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration.getFields(), s3ReaderConfiguration.getFieldDelimiter(), s3ReaderConfiguration.getLineDelimiter());
+        lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
         // Connect to S3
         synchronized (this)
         {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index e426983..3686f55 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -25,21 +25,31 @@ import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
-import org.apache.streams.hdfs.WebHdfsPersistWriter;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.*;
-import java.util.*;
+import java.io.Flushable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -234,7 +244,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
 
     public void prepare(Object configurationObject) {
 
-        lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration.getFields(), s3WriterConfiguration.getFieldDelimiter(), s3WriterConfiguration.getLineDelimiter());
+        lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration);
 
         // Connect to S3
         synchronized (this) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
index 7f2e9e5..403bfac 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
@@ -7,6 +7,9 @@
     "type": "object",
     "javaType" : "org.apache.streams.s3.S3Configuration",
     "javaInterfaces": ["java.io.Serializable"],
+    "extends": {
+        "$ref": "../../../../../../../../../../streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json"
+    },
     "properties": {
         "key": {
             "type": "string",
@@ -33,26 +36,6 @@
             "type": "string",
             "description": "The AWS region where your bucket resides",
             "required": false
-        },
-        "fields": {
-            "type": "array",
-            "items": {
-                "type": "string"
-            },
-            "default": [
-                "ID",
-                "TS",
-                "META",
-                "DOC"
-            ]
-        },
-        "field_delimiter": {
-            "type": "string",
-            "default": "\t"
-        },
-        "line_delimiter": {
-            "type": "string",
-            "default": "\n"
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index ffdeb4c..24c9737 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.hdfs;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
@@ -28,13 +27,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
@@ -46,9 +48,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
-import java.util.Map;
-import java.util.Queue;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -170,7 +171,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     @Override
     public void prepare(Object configurationObject) {
         LOGGER.debug("Prepare");
-        lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+        lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
         connectToWebHDFS();
         String pathString = hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getReaderPath();
         LOGGER.info("Path : {}", pathString);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 6ace93b..492eccb 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -18,11 +18,8 @@
 
 package org.apache.streams.hdfs;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,9 +28,12 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +46,6 @@ import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Queue;
 import java.util.zip.GZIPOutputStream;
@@ -166,6 +165,8 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
 
             String line = lineWriterUtil.convertResultToString(streamsDatum);
             writeInternal(line);
+            if( !line.endsWith(this.hdfsConfiguration.getLineDelimiter()))
+                writeInternal(this.hdfsConfiguration.getLineDelimiter());
             int bytesInLine = line.getBytes().length;
 
             totalRecordsWritten++;
@@ -268,7 +269,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     @Override
     public void prepare(Object configurationObject) {
         mapper = StreamsJacksonMapper.getInstance();
-        lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration.getFields(), hdfsConfiguration.getFieldDelimiter(), hdfsConfiguration.getLineDelimiter());
+        lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
         connectToWebHDFS();
         path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/19f69e5a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
index 61245c4..2853002 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
@@ -7,6 +7,9 @@
     "type": "object",
     "javaType" : "org.apache.streams.hdfs.HdfsConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
+    "extends": {
+        "$ref": "../../../../../../../../../streams-components/streams-converters/src/main/jsonschema/org/apache/streams/converter/LineReadWriteConfiguration.json"
+    },
     "properties": {
         "scheme": {
             "type": "string",
@@ -31,32 +34,8 @@
             "description": "User"
         },
         "password": {
-            "type": "string",
-            "description": "Password"
-        },
-        "fields": {
-          "type": "array",
-          "items": {
-            "type": "string"
-          },
-          "default": [
-            "ID",
-            "TS",
-            "META",
-            "DOC"
-          ]
-        },
-        "field_delimiter": {
-          "type": "string",
-          "default": "\t"
-        },
-        "line_delimiter": {
-          "type": "string",
-          "default": "\n"
-        },
-        "encoding": {
           "type": "string",
-          "default": "UTF-8"
+          "description": "Password"
         }
     }
 }
\ No newline at end of file