You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/08 21:12:56 UTC

[05/38] incubator-streams git commit: Writer can now write to us-west-1 Simplified configurator

Writer can now write to us-west-1
Simplified configurator


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/25af3ad3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/25af3ad3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/25af3ad3

Branch: refs/heads/STREAMS-49
Commit: 25af3ad3dadf40d57eeb62479b3e73cd5170275e
Parents: 1788691
Author: sblackmon <sb...@apache.org>
Authored: Tue Oct 7 13:44:22 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Oct 7 13:44:22 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-amazon-aws/pom.xml      |  2 +-
 .../org/apache/streams/s3/S3Configurator.java   | 55 +++++++++-----------
 .../org/apache/streams/s3/S3PersistReader.java  | 11 ++--
 .../org/apache/streams/s3/S3PersistWriter.java  | 10 ++--
 .../org/apache/streams/s3/S3Configuration.json  | 18 +++++--
 5 files changed, 55 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
index 57a67cb..c9b73ae 100644
--- a/streams-contrib/streams-amazon-aws/pom.xml
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -45,7 +45,7 @@
 	        <dependency>
 	            <groupId>com.amazonaws</groupId>
 	            <artifactId>aws-java-sdk</artifactId>
-	            <version>1.7.5</version>
+	            <version>1.8.11</version>
 	        </dependency>
             <dependency>
                 <groupId>org.apache.streams</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
index dfa0426..6bf1672 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -18,7 +18,9 @@
 package org.apache.streams.s3;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,53 +32,46 @@ public class S3Configurator {
 
     public static S3Configuration detectConfiguration(Config s3) {
 
-        S3Configuration s3Configuration = new S3Configuration();
+        S3Configuration s3Configuration = null;
 
-        s3Configuration.setBucket(s3.getString("bucket"));
-        s3Configuration.setKey(s3.getString("key"));
-        s3Configuration.setSecretKey(s3.getString("secretKey"));
-
-        // The Amazon S3 Library defaults to HTTPS
-        String protocol = (!s3.hasPath("protocol") ? "https": s3.getString("protocol")).toLowerCase();
-
-        if(!(protocol.equals("https") || protocol.equals("http"))) {
-            // you must specify either HTTP or HTTPS
-            throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol");
+        try {
+            s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3Configuration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse S3Configuration");
         }
 
-        s3Configuration.setProtocol(protocol.toLowerCase());
-
         return s3Configuration;
     }
 
     public static S3ReaderConfiguration detectReaderConfiguration(Config s3) {
 
-        S3Configuration S3Configuration = detectConfiguration(s3);
-        S3ReaderConfiguration s3ReaderConfiguration = mapper.convertValue(S3Configuration, S3ReaderConfiguration.class);
+        S3ReaderConfiguration s3Configuration = null;
 
-        s3ReaderConfiguration.setReaderPath(s3.getString("readerPath"));
+        try {
+            s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3ReaderConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse S3Configuration");
+        }
 
-        return s3ReaderConfiguration;
+        return s3Configuration;
     }
 
     public static S3WriterConfiguration detectWriterConfiguration(Config s3) {
 
-        S3Configuration s3Configuration = detectConfiguration(s3);
-        S3WriterConfiguration s3WriterConfiguration  = mapper.convertValue(s3Configuration, S3WriterConfiguration.class);
+        S3WriterConfiguration s3Configuration = null;
 
-        String rootPath = s3.getString("writerPath");
-
-        // if the root path doesn't end in a '/' then we need to force the '/' at the end of the path.
-        s3WriterConfiguration.setWriterPath(rootPath + (rootPath.endsWith("/") ? "" : "/"));
-
-        s3WriterConfiguration.setWriterFilePrefix(s3.hasPath("writerFilePrefix") ? s3.getString("writerFilePrefix") : "default");
+        try {
+            s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3WriterConfiguration.class);
+        } catch (Exception e) {
+            e.printStackTrace();
+            LOGGER.warn("Could not parse S3Configuration");
+        }
 
-        if(s3.hasPath("maxFileSize"))
-            s3WriterConfiguration.setMaxFileSize((long)s3.getInt("maxFileSize"));
-        if(s3.hasPath("chunk"))
-            s3WriterConfiguration.setChunk(s3.getBoolean("chunk"));
+        Preconditions.checkArgument(s3Configuration.getWriterPath().endsWith("/"), s3Configuration.getWriterPath() + " must end with '/'");
 
-        return s3WriterConfiguration;
+        return s3Configuration;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 4f62a06..5709f22 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -21,12 +21,15 @@ import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
 import org.apache.streams.core.*;
 import org.joda.time.DateTime;
@@ -104,13 +107,15 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
             AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
 
             ClientConfiguration clientConfig = new ClientConfiguration();
-            clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase()));
+            clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));
 
-            // We want path style access
+            // We do not want path style access
             S3ClientOptions clientOptions = new S3ClientOptions();
-            clientOptions.setPathStyleAccess(true);
+            clientOptions.setPathStyleAccess(false);
 
             this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+            if( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion()))
+                this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
             this.amazonS3Client.setS3ClientOptions(clientOptions);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 058f748..9111265 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -21,6 +21,8 @@ import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -256,13 +258,15 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
                 AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
 
                 ClientConfiguration clientConfig = new ClientConfiguration();
-                clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toUpperCase()));
+                clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
 
-                // We want path style access
+                // We do not want path style access
                 S3ClientOptions clientOptions = new S3ClientOptions();
-                clientOptions.setPathStyleAccess(true);
+                clientOptions.setPathStyleAccess(false);
 
                 this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+                if( !Strings.isNullOrEmpty(s3WriterConfiguration.getRegion()))
+                    this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
                 this.amazonS3Client.setS3ClientOptions(clientOptions);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
index 863668f..36e89d0 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
@@ -7,19 +7,29 @@
     "properties": {
         "key": {
             "type": "string",
-            "description": "Your Amazon Key"
+            "description": "Your Amazon Key",
+            "required": true
         },
         "secretKey": {
             "type": "string",
-            "description": "Your Amazon Secret Key"
+            "description": "Your Amazon Secret Key",
+            "required": true
         },
         "bucket": {
             "type": "string",
-            "description": "The AWS bucket you want to write to"
+            "description": "Your AWS bucket",
+            "required": true
         },
         "protocol": {
             "type": "string",
-            "description": "Whether you are using HTTP or HTTPS"
+            "description": "Whether you are using HTTP or HTTPS",
+            "enum": ["HTTP", "HTTPS"],
+            "default": "HTTPS"
+        },
+        "region": {
+            "type": "string",
+            "description": "The AWS region where your bucket resides",
+            "required": true
         }
     }
 }
\ No newline at end of file