You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/08 17:44:07 UTC

[5/8] git commit: Changes based on feedback.

Changes based on feedback.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1a03d5f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1a03d5f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1a03d5f6

Branch: refs/heads/master
Commit: 1a03d5f67ae0b634f903733dd14a26236331690c
Parents: 4f30bd2
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 17:13:06 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 17:13:06 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3Configurator.java   |   1 +
 .../streams/s3/S3ObjectInputStreamWrapper.java  | 108 ++++++++++++-------
 .../streams/s3/S3OutputStreamWrapper.java       |  67 ++++++------
 .../org/apache/streams/s3/S3PersistReader.java  |  42 +++++---
 .../apache/streams/s3/S3PersistReaderTask.java  |  19 ++--
 .../org/apache/streams/s3/S3PersistWriter.java  |   5 +-
 6 files changed, 140 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
index 8190404..3413ef7 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
@@ -24,6 +24,7 @@ public class S3Configurator {
 
         if(!(protocol.equals("https") || protocol.equals("http"))) {
             // you must specify either HTTP or HTTPS
+            throw new RuntimeException("You must specify either HTTP or HTTPS as a protocol");
         }
 
         s3Configuration.setProtocol(protocol.toLowerCase());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index 2a2dba0..900ebfb 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -9,28 +9,15 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.io.InputStream;
-
 /**
- * There is a stupid nuance associated with reading portions of files in S3. Everything occurs over
- * an Apache HTTP client object. Apache defaults to re-using the stream. So, if you only want to read
- * a small portion of the file. You must first "abort" the stream, then close. Otherwise, Apache will
- * exhaust the stream and transfer a ton of data attempting to do so.
- *
+ * There is a nuance associated with reading portions of files in S3. Everything occurs over
+ * an Apache HTTP client object. Apache and therefore Amazon defaults to re-using the stream.
+ * As a result, if you only intend read a small portion of the file. You must first "abort" the
+ * stream, then close the 'inputStream'. Otherwise, Apache will exhaust the entire stream
+ * and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file
+ * this becomes problematic.
  *
- * Author   Smashew
- * Date     2014-04-11
- *
- * After a few more days and some demos that had some issues with concurrency and high user load. This
- * was also discovered. There is an issue with the S3Object's HTTP connection not being released back
- * to the connection pool (until it times out) even once the item is garbage collected. So....
+ * This class operates as a wrapper to fix the aforementioned nuances.
  *
  * Reference:
  * http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3
@@ -43,39 +30,82 @@ public class S3ObjectInputStreamWrapper extends InputStream
     private final S3ObjectInputStream is;
     private boolean isClosed = false;
 
+    /**
+     * Create an input stream safely from
+     * @param s3Object
+     */
     public S3ObjectInputStreamWrapper(S3Object s3Object) {
         this.s3Object = s3Object;
         this.is = this.s3Object.getObjectContent();
     }
 
-    public int hashCode()                                           { return this.is.hashCode(); }
-    public boolean equals(Object obj)                               { return this.is.equals(obj); }
-    public String toString()                                        { return this.is.toString(); }
-    public int read() throws IOException                            { return this.is.read(); }
-    public int read(byte[] b) throws IOException                    { return this.is.read(b); }
-    public int read(byte[] b, int off, int len) throws IOException  { return this.is.read(b, off, len); }
-    public long skip(long n) throws IOException                     { return this.is.skip(n); }
-    public int available() throws IOException                       { return this.is.available(); }
-    public boolean markSupported()                                  { return this.is.markSupported(); }
-    public synchronized void mark(int readlimit)                    { this.is.mark(readlimit); }
-    public synchronized void reset() throws IOException             { this.is.reset(); }
+    public int hashCode() {
+        return this.is.hashCode();
+    }
+
+    public boolean equals(Object obj) {
+        return this.is.equals(obj);
+    }
+
+    public String toString() {
+        return this.is.toString();
+    }
+
+    public int read() throws IOException {
+        return this.is.read();
+    }
+
+    public int read(byte[] b) throws IOException {
+        return this.is.read(b);
+    }
+
+    public int read(byte[] b, int off, int len) throws IOException {
+        return this.is.read(b, off, len);
+    }
+
+    public long skip(long n) throws IOException {
+        return this.is.skip(n);
+    }
+
+    public int available() throws IOException {
+        return this.is.available();
+    }
+
+    public boolean markSupported() {
+        return this.is.markSupported();
+    }
+
+    public synchronized void mark(int readlimit) {
+        this.is.mark(readlimit);
+    }
+
+    public synchronized void reset() throws IOException {
+        this.is.reset();
+    }
 
     public void close() throws IOException {
         ensureEverythingIsReleased();
     }
 
-    public void ensureEverythingIsReleased()
-    {
+    public void ensureEverythingIsReleased() {
         if(this.isClosed)
             return;
 
-        // THIS WHOLE CLASS IS JUST FOR THIS FUNCTION!
-        // Amazon S3 - HTTP Exhaust all file contents issue
+
         try {
+            // ensure that the S3 Object is closed properly.
+            this.s3Object.close();
+        } catch(Throwable e) {
+            LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
+        }
+
+
+        try {
+            // Abort the stream
             this.is.abort();
         }
-        catch(Exception e) {
-            LOGGER.warn("S3Object[{}]: Issue Aborting Stream - {}", s3Object.getKey(), e.getMessage());
+        catch(Throwable e) {
+            LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
         }
 
         // close the input Stream Safely
@@ -98,8 +128,8 @@ public class S3ObjectInputStreamWrapper extends InputStream
 
     protected void finalize( ) throws Throwable
     {
-        try
-        {
+        try {
+            // If there is an accidental leak where the user did not close, call this on the classes destructor
             ensureEverythingIsReleased();
             super.finalize();
         } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index 8f55983..c488b48 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -9,33 +9,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.*;
-
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectResult;
-import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
-import org.apache.commons.io.FilenameUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.util.Date;
 import java.util.Map;
 
 /**
- *
- * Author:  Smashew
- * Date:    2014-04-14
- *
- * Description:
- * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly.
- *
- * There is a way to upload data in chunks (5mb or so) a peice, but the multi-part upload
- * is kind of a PITA to deal with.
- *
- * // TODO: This should be refactored to allow a user to specify if they want one large file instead of many small ones
+ * This class uses ByteArrayOutputStreams to ensure files are written to S3 properly. The stream is written to the
+ * in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become
+ * is directly controlled by the S3PersistWriter.
  */
 public class S3OutputStreamWrapper extends OutputStream
 {
@@ -47,11 +26,24 @@ public class S3OutputStreamWrapper extends OutputStream
     private final String fileName;
     private ByteArrayOutputStream outputStream;
     private final Map<String, String> metaData;
-
     private boolean isClosed = false;
 
-    public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException
-    {
+    /**
+     * Create an OutputStream Wrapper
+     * @param amazonS3Client
+     * The Amazon S3 Client which will be handling the file
+     * @param bucketName
+     * The Bucket Name you are wishing to write to.
+     * @param path
+     * The path where the object will live
+     * @param fileName
+     * The fileName you ware wishing to write.
+     * @param metaData
+     * Any meta data that is to be written along with the object
+     * @throws IOException
+     * If there is an issue creating the stream, this
+     */
+    public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException {
         this.amazonS3Client = amazonS3Client;
         this.bucketName = bucketName;
         this.path = path;
@@ -60,14 +52,21 @@ public class S3OutputStreamWrapper extends OutputStream
         this.outputStream = new ByteArrayOutputStream();
     }
 
-    /*
-     * The Methods that are overriden to support the 'OutputStream' object.
-     */
+    public void write(int b) throws IOException {
+        this.outputStream.write(b);
+    }
 
-    public void write(int b) throws IOException                         { this.outputStream.write(b); }
-    public void write(byte[] b) throws IOException                      { this.outputStream.write(b); }
-    public void write(byte[] b, int off, int len) throws IOException    { this.outputStream.write(b, off, len); }
-    public void flush() throws IOException                              { this.outputStream.flush(); }
+    public void write(byte[] b) throws IOException {
+        this.outputStream.write(b);
+    }
+
+    public void write(byte[] b, int off, int len) throws IOException {
+        this.outputStream.write(b, off, len);
+    }
+
+    public void flush() throws IOException {
+        this.outputStream.flush();
+    }
 
     /**
      * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index a987a47..938dc66 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -15,11 +15,7 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.math.BigInteger;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Queue;
@@ -43,13 +39,33 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
     protected DatumStatusCounter countersTotal = new DatumStatusCounter();
     protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
 
-    public AmazonS3Client getAmazonS3Client()                           { return this.amazonS3Client; }
-    public S3ReaderConfiguration getS3ReaderConfiguration()             { return this.s3ReaderConfiguration; }
-    public String getBucketName()                                       { return this.s3ReaderConfiguration.getBucket(); }
-    public StreamsResultSet readNew(BigInteger sequence)                { return null; }
-    public StreamsResultSet readRange(DateTime start, DateTime end)     { return null; }
-    public DatumStatusCounter getDatumStatusCounter()                   { return countersTotal; }
-    public Collection<String> getFiles()                                { return this.files; }
+    public AmazonS3Client getAmazonS3Client() {
+        return this.amazonS3Client;
+    }
+
+    public S3ReaderConfiguration getS3ReaderConfiguration() {
+        return this.s3ReaderConfiguration;
+    }
+
+    public String getBucketName() {
+        return this.s3ReaderConfiguration.getBucket();
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    public DatumStatusCounter getDatumStatusCounter() {
+        return countersTotal;
+    }
+
+    public Collection<String> getFiles() {
+        return this.files;
+    }
 
     public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
         this.s3ReaderConfiguration = s3ReaderConfiguration;
@@ -111,7 +127,9 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
         this.executor = Executors.newSingleThreadExecutor();
     }
 
-    public void cleanUp() { }
+    public void cleanUp() {
+        // no Op
+    }
 
     public StreamsResultSet readAll() {
         startStream();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 70015fb..5b4abe4 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -23,20 +23,17 @@ public class S3PersistReaderTask implements Runnable {
     @Override
     public void run() {
 
-        for(String file : reader.getFiles())
-        {
-            // Create our buffered reader
+        for(String file : reader.getFiles()) {
 
+            // Create our buffered reader
             S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
             LOGGER.info("Reading: {} ", file);
 
             String line = "";
             try {
-                while((line = bufferedReader.readLine()) != null)
-                {
-                    if( !Strings.isNullOrEmpty(line) )
-                    {
+                while((line = bufferedReader.readLine()) != null) {
+                    if( !Strings.isNullOrEmpty(line) ) {
                         reader.countersCurrent.incrementAttempt();
                         String[] fields = line.split(Character.toString(reader.DELIMITER));
                         StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
@@ -44,9 +41,7 @@ public class S3PersistReaderTask implements Runnable {
                         reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                     }
                 }
-            }
-            catch (Exception e)
-            {
+            } catch (Exception e) {
                 e.printStackTrace();
                 LOGGER.warn(e.getMessage());
                 reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
@@ -57,7 +52,6 @@ public class S3PersistReaderTask implements Runnable {
             try {
                 closeSafely(file, is);
             } catch (Exception e) {
-                e.printStackTrace();
                 LOGGER.error(e.getMessage());
             }
         }
@@ -66,8 +60,7 @@ public class S3PersistReaderTask implements Runnable {
     private static void closeSafely(String file, Closeable closeable) {
         try {
             closeable.close();
-        }
-        catch(Exception e) {
+        } catch(Exception e) {
             LOGGER.error("There was an issue closing file: {}", file);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1a03d5f6/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index c46ff03..3685012 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -9,7 +9,6 @@ import com.amazonaws.services.s3.S3ClientOptions;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
-import com.google.common.util.concurrent.AtomicDouble;
 import org.apache.streams.core.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -188,7 +187,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
         }
     }
 
-
     private String convertResultToString(StreamsDatum entry)
     {
         String metadata = null;
@@ -227,8 +225,7 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
                 this.objectMapper = new ObjectMapper();
 
             // Create the credentials Object
-            if(this.amazonS3Client == null)
-            {
+            if(this.amazonS3Client == null) {
                 AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
 
                 ClientConfiguration clientConfig = new ClientConfiguration();