You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/11/08 21:12:56 UTC
[05/38] incubator-streams git commit: Writer can now write to
us-west-1 Simplified configurator
Writer can now write to us-west-1
Simplified configurator
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/25af3ad3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/25af3ad3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/25af3ad3
Branch: refs/heads/STREAMS-49
Commit: 25af3ad3dadf40d57eeb62479b3e73cd5170275e
Parents: 1788691
Author: sblackmon <sb...@apache.org>
Authored: Tue Oct 7 13:44:22 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Oct 7 13:44:22 2014 -0500
----------------------------------------------------------------------
streams-contrib/streams-amazon-aws/pom.xml | 2 +-
.../org/apache/streams/s3/S3Configurator.java | 55 +++++++++-----------
.../org/apache/streams/s3/S3PersistReader.java | 11 ++--
.../org/apache/streams/s3/S3PersistWriter.java | 10 ++--
.../org/apache/streams/s3/S3Configuration.json | 18 +++++--
5 files changed, 55 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/pom.xml b/streams-contrib/streams-amazon-aws/pom.xml
index 57a67cb..c9b73ae 100644
--- a/streams-contrib/streams-amazon-aws/pom.xml
+++ b/streams-contrib/streams-amazon-aws/pom.xml
@@ -45,7 +45,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
- <version>1.7.5</version>
+ <version>1.8.11</version>
</dependency>
<dependency>
<groupId>org.apache.streams</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
index dfa0426..6bf1672 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -18,7 +18,9 @@
package org.apache.streams.s3;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,53 +32,46 @@ public class S3Configurator {
public static S3Configuration detectConfiguration(Config s3) {
- S3Configuration s3Configuration = new S3Configuration();
+ S3Configuration s3Configuration = null;
- s3Configuration.setBucket(s3.getString("bucket"));
- s3Configuration.setKey(s3.getString("key"));
- s3Configuration.setSecretKey(s3.getString("secretKey"));
-
- // The Amazon S3 Library defaults to HTTPS
- String protocol = (!s3.hasPath("protocol") ? "https": s3.getString("protocol")).toLowerCase();
-
- if(!(protocol.equals("https") || protocol.equals("http"))) {
- // you must specify either HTTP or HTTPS
- throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol");
+ try {
+ s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3Configuration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse S3Configuration");
}
- s3Configuration.setProtocol(protocol.toLowerCase());
-
return s3Configuration;
}
public static S3ReaderConfiguration detectReaderConfiguration(Config s3) {
- S3Configuration S3Configuration = detectConfiguration(s3);
- S3ReaderConfiguration s3ReaderConfiguration = mapper.convertValue(S3Configuration, S3ReaderConfiguration.class);
+ S3ReaderConfiguration s3Configuration = null;
- s3ReaderConfiguration.setReaderPath(s3.getString("readerPath"));
+ try {
+ s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3ReaderConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse S3Configuration");
+ }
- return s3ReaderConfiguration;
+ return s3Configuration;
}
public static S3WriterConfiguration detectWriterConfiguration(Config s3) {
- S3Configuration s3Configuration = detectConfiguration(s3);
- S3WriterConfiguration s3WriterConfiguration = mapper.convertValue(s3Configuration, S3WriterConfiguration.class);
+ S3WriterConfiguration s3Configuration = null;
- String rootPath = s3.getString("writerPath");
-
- // if the root path doesn't end in a '/' then we need to force the '/' at the end of the path.
- s3WriterConfiguration.setWriterPath(rootPath + (rootPath.endsWith("/") ? "" : "/"));
-
- s3WriterConfiguration.setWriterFilePrefix(s3.hasPath("writerFilePrefix") ? s3.getString("writerFilePrefix") : "default");
+ try {
+ s3Configuration = mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), S3WriterConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse S3Configuration");
+ }
- if(s3.hasPath("maxFileSize"))
- s3WriterConfiguration.setMaxFileSize((long)s3.getInt("maxFileSize"));
- if(s3.hasPath("chunk"))
- s3WriterConfiguration.setChunk(s3.getBoolean("chunk"));
+ Preconditions.checkArgument(s3Configuration.getWriterPath().endsWith("/"), s3Configuration.getWriterPath() + " must end with '/'");
- return s3WriterConfiguration;
+ return s3Configuration;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 4f62a06..5709f22 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -21,12 +21,15 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import org.apache.streams.core.*;
import org.joda.time.DateTime;
@@ -104,13 +107,15 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
ClientConfiguration clientConfig = new ClientConfiguration();
- clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toUpperCase()));
+ clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));
- // We want path style access
+ // We do not want path style access
S3ClientOptions clientOptions = new S3ClientOptions();
- clientOptions.setPathStyleAccess(true);
+ clientOptions.setPathStyleAccess(false);
this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+ if( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion()))
+ this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
this.amazonS3Client.setS3ClientOptions(clientOptions);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 058f748..9111265 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -21,6 +21,8 @@ import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.regions.Region;
+import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -256,13 +258,15 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
ClientConfiguration clientConfig = new ClientConfiguration();
- clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toUpperCase()));
+ clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
- // We want path style access
+ // We do not want path style access
S3ClientOptions clientOptions = new S3ClientOptions();
- clientOptions.setPathStyleAccess(true);
+ clientOptions.setPathStyleAccess(false);
this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+ if( !Strings.isNullOrEmpty(s3WriterConfiguration.getRegion()))
+ this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
this.amazonS3Client.setS3ClientOptions(clientOptions);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/25af3ad3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
index 863668f..36e89d0 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/jsonschema/org/apache/streams/s3/S3Configuration.json
@@ -7,19 +7,29 @@
"properties": {
"key": {
"type": "string",
- "description": "Your Amazon Key"
+ "description": "Your Amazon Key",
+ "required": true
},
"secretKey": {
"type": "string",
- "description": "Your Amazon Secret Key"
+ "description": "Your Amazon Secret Key",
+ "required": true
},
"bucket": {
"type": "string",
- "description": "The AWS bucket you want to write to"
+ "description": "Your AWS bucket",
+ "required": true
},
"protocol": {
"type": "string",
- "description": "Whether you are using HTTP or HTTPS"
+ "description": "Whether you are using HTTP or HTTPS",
+ "enum": ["HTTP", "HTTPS"],
+ "default": "HTTPS"
+ },
+ "region": {
+ "type": "string",
+ "description": "The AWS region where your bucket resides",
+ "required": true
}
}
}
\ No newline at end of file