You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/08 17:44:07 UTC
[5/8] git commit: Changes based on feedback.
Changes based on feedback.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1a03d5f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1a03d5f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1a03d5f6
Branch: refs/heads/master
Commit: 1a03d5f67ae0b634f903733dd14a26236331690c
Parents: 4f30bd2
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 17:13:06 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 17:13:06 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/s3/S3Configurator.java | 1 +
.../streams/s3/S3ObjectInputStreamWrapper.java | 108 ++++++++++++-------
.../streams/s3/S3OutputStreamWrapper.java | 67 ++++++------
.../org/apache/streams/s3/S3PersistReader.java | 42 +++++---
.../apache/streams/s3/S3PersistReaderTask.java | 19 ++--
.../org/apache/streams/s3/S3PersistWriter.java | 5 +-
6 files changed, 140 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
index 8190404..3413ef7 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -24,6 +24,7 @@ public class S3Configurator {
if(!(protocol.equals("https") || protocol.equals("http"))) {
// you must specify either HTTP or HTTPS
+ throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol");
}
s3Configuration.setProtocol(protocol.toLowerCase());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index 2a2dba0..900ebfb 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -9,28 +9,15 @@ import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-
/**
- * There is a stupid nuance associated with reading portions of files in S3. Everything occurs over
- * an Apache HTTP client object. Apache defaults to re-using the stream. So, if you only want to read
- * a small portion of the file. You must first "abort" the stream, then close. Otherwise, Apache will
- * exhaust the stream and transfer a ton of data attempting to do so.
- *
+ * There is a nuance associated with reading portions of files in S3. Everything occurs over
+ * an Apache HTTP client object. Apache and therefore Amazon defaults to re-using the stream.
+ * As a result, if you only intend read a small portion of the file. You must first "abort" the
+ * stream, then close the 'inputStream'. Otherwise, Apache will exhaust the entire stream
+ * and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file
+ * this becomes problematic.
*
- * Author Smashew
- * Date 2014-04-11
- *
- * After a few more days and some demos that had some issues with concurrency and high user load. This
- * was also discovered. There is an issue with the S3Object's HTTP connection not being released back
- * to the connection pool (until it times out) even once the item is garbage collected. So....
+ * This class operates as a wrapper to fix the aforementioned nuances.
*
* Reference:
* http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3
@@ -43,39 +30,82 @@ public class S3ObjectInputStreamWrapper extends InputStream
private final S3ObjectInputStream is;
private boolean isClosed = false;
+ /**
+ * Create an input stream safely from
+ * @param s3Object
+ */
public S3ObjectInputStreamWrapper(S3Object s3Object) {
this.s3Object = s3Object;
this.is = this.s3Object.getObjectContent();
}
- public int hashCode() { return this.is.hashCode(); }
- public boolean equals(Object obj) { return this.is.equals(obj); }
- public String toString() { return this.is.toString(); }
- public int read() throws IOException { return this.is.read(); }
- public int read(byte[] b) throws IOException { return this.is.read(b); }
- public int read(byte[] b, int off, int len) throws IOException { return this.is.read(b, off, len); }
- public long skip(long n) throws IOException { return this.is.skip(n); }
- public int available() throws IOException { return this.is.available(); }
- public boolean markSupported() { return this.is.markSupported(); }
- public synchronized void mark(int readlimit) { this.is.mark(readlimit); }
- public synchronized void reset() throws IOException { this.is.reset(); }
+ public int hashCode() {
+ return this.is.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ return this.is.equals(obj);
+ }
+
+ public String toString() {
+ return this.is.toString();
+ }
+
+ public int read() throws IOException {
+ return this.is.read();
+ }
+
+ public int read(byte[] b) throws IOException {
+ return this.is.read(b);
+ }
+
+ public int read(byte[] b, int off, int len) throws IOException {
+ return this.is.read(b, off, len);
+ }
+
+ public long skip(long n) throws IOException {
+ return this.is.skip(n);
+ }
+
+ public int available() throws IOException {
+ return this.is.available();
+ }
+
+ public boolean markSupported() {
+ return this.is.markSupported();
+ }
+
+ public synchronized void mark(int readlimit) {
+ this.is.mark(readlimit);
+ }
+
+ public synchronized void reset() throws IOException {
+ this.is.reset();
+ }
public void close() throws IOException {
ensureEverythingIsReleased();
}
- public void ensureEverythingIsReleased()
- {
+ public void ensureEverythingIsReleased() {
if(this.isClosed)
return;
- // THIS WHOLE CLASS IS JUST FOR THIS FUNCTION!
- // Amazon S3 - HTTP Exhaust all file contents issue
+
try {
+ // ensure that the S3 Object is closed properly.
+ this.s3Object.close();
+ } catch(Throwable e) {
+ LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
+ }
+
+
+ try {
+ // Abort the stream
this.is.abort();
}
- catch(Exception e) {
- LOGGER.warn("S3Object[{}]: Issue Aborting Stream - {}", s3Object.getKey(), e.getMessage());
+ catch(Throwable e) {
+ LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
}
// close the input Stream Safely
@@ -98,8 +128,8 @@ public class S3ObjectInputStreamWrapper extends InputStream
protected void finalize( ) throws Throwable
{
- try
- {
+ try {
+ // If there is an accidental leak where the user did not close, call this on the classes destructor
ensureEverythingIsReleased();
super.finalize();
} catch(Exception e) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index 8f55983..c488b48 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -9,33 +9,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
-
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
-import org.apache.commons.io.FilenameUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Date;
import java.util.Map;
/**
- *
- * Author: Smashew
- * Date: 2014-04-14
- *
- * Description:
- * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly.
- *
- * There is a way to upload data in chunks (5mb or so) a peice, but the multi-part upload
- * is kind of a PITA to deal with.
- *
- * // TODO: This should be refactored to allow a user to specify if they want one large file instead of many small ones
+ * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. The stream is written to the
+ * in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become
+ * is directly controlled by the S3PersistWriter.
*/
public class S3OutputStreamWrapper extends OutputStream
{
@@ -47,11 +26,24 @@ public class S3OutputStreamWrapper extends OutputStream
private final String fileName;
private ByteArrayOutputStream outputStream;
private final Map<String, String> metaData;
-
private boolean isClosed = false;
- public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException
- {
+ /**
+ * Create an OutputStream Wrapper
+ * @param amazonS3Client
+ * The Amazon S3 Client which will be handling the file
+ * @param bucketName
+ * The Bucket Name you are wishing to write to.
+ * @param path
+ * The path where the object will live
+ * @param fileName
+ * The fileName you ware wishing to write.
+ * @param metaData
+ * Any meta data that is to be written along with the object
+ * @throws IOException
+ * If there is an issue creating the stream, this
+ */
+ public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException {
this.amazonS3Client = amazonS3Client;
this.bucketName = bucketName;
this.path = path;
@@ -60,14 +52,21 @@ public class S3OutputStreamWrapper extends OutputStream
this.outputStream = new ByteArrayOutputStream();
}
- /*
- * The Methods that are overriden to support the 'OutputStream' object.
- */
+ public void write(int b) throws IOException {
+ this.outputStream.write(b);
+ }
- public void write(int b) throws IOException { this.outputStream.write(b); }
- public void write(byte[] b) throws IOException { this.outputStream.write(b); }
- public void write(byte[] b, int off, int len) throws IOException { this.outputStream.write(b, off, len); }
- public void flush() throws IOException { this.outputStream.flush(); }
+ public void write(byte[] b) throws IOException {
+ this.outputStream.write(b);
+ }
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ this.outputStream.write(b, off, len);
+ }
+
+ public void flush() throws IOException {
+ this.outputStream.flush();
+ }
/**
* Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index a987a47..938dc66 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -15,11 +15,7 @@ import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.math.BigInteger;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
@@ -43,13 +39,33 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
protected DatumStatusCounter countersTotal = new DatumStatusCounter();
protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
- public AmazonS3Client getAmazonS3Client() { return this.amazonS3Client; }
- public S3ReaderConfiguration getS3ReaderConfiguration() { return this.s3ReaderConfiguration; }
- public String getBucketName() { return this.s3ReaderConfiguration.getBucket(); }
- public StreamsResultSet readNew(BigInteger sequence) { return null; }
- public StreamsResultSet readRange(DateTime start, DateTime end) { return null; }
- public DatumStatusCounter getDatumStatusCounter() { return countersTotal; }
- public Collection<String> getFiles() { return this.files; }
+ public AmazonS3Client getAmazonS3Client() {
+ return this.amazonS3Client;
+ }
+
+ public S3ReaderConfiguration getS3ReaderConfiguration() {
+ return this.s3ReaderConfiguration;
+ }
+
+ public String getBucketName() {
+ return this.s3ReaderConfiguration.getBucket();
+ }
+
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ public DatumStatusCounter getDatumStatusCounter() {
+ return countersTotal;
+ }
+
+ public Collection<String> getFiles() {
+ return this.files;
+ }
public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
this.s3ReaderConfiguration = s3ReaderConfiguration;
@@ -111,7 +127,9 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
this.executor = Executors.newSingleThreadExecutor();
}
- public void cleanUp() { }
+ public void cleanUp() {
+ // no Op
+ }
public StreamsResultSet readAll() {
startStream();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 70015fb..5b4abe4 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -23,20 +23,17 @@ public class S3PersistReaderTask implements Runnable {
@Override
public void run() {
- for(String file : reader.getFiles())
- {
- // Create our buffered reader
+ for(String file : reader.getFiles()) {
+ // Create our buffered reader
S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
LOGGER.info("Reading: {} ", file);
String line = "";
try {
- while((line = bufferedReader.readLine()) != null)
- {
- if( !Strings.isNullOrEmpty(line) )
- {
+ while((line = bufferedReader.readLine()) != null) {
+ if( !Strings.isNullOrEmpty(line) ) {
reader.countersCurrent.incrementAttempt();
String[] fields = line.split(Character.toString(reader.DELIMITER));
StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
@@ -44,9 +41,7 @@ public class S3PersistReaderTask implements Runnable {
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
}
}
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
e.printStackTrace();
LOGGER.warn(e.getMessage());
reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
@@ -57,7 +52,6 @@ public class S3PersistReaderTask implements Runnable {
try {
closeSafely(file, is);
} catch (Exception e) {
- e.printStackTrace();
LOGGER.error(e.getMessage());
}
}
@@ -66,8 +60,7 @@ public class S3PersistReaderTask implements Runnable {
private static void closeSafely(String file, Closeable closeable) {
try {
closeable.close();
- }
- catch(Exception e) {
+ } catch(Exception e) {
LOGGER.error("There was an issue closing file: {}", file);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index c46ff03..3685012 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -9,7 +9,6 @@ import com.amazonaws.services.s3.S3ClientOptions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
-import com.google.common.util.concurrent.AtomicDouble;
import org.apache.streams.core.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -188,7 +187,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
}
}
-
private String convertResultToString(StreamsDatum entry)
{
String metadata = null;
@@ -227,8 +225,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
this.objectMapper = new ObjectMapper();
// Create the credentials Object
- if(this.amazonS3Client == null)
- {
+ if(this.amazonS3Client == null) {
AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
ClientConfiguration clientConfig = new ClientConfiguration();