You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2019/04/07 18:16:48 UTC

[streams] branch master updated: STREAMS-641 implement readAll() in S3PersistReader

This is an automated email from the ASF dual-hosted git repository.

sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git


The following commit(s) were added to refs/heads/master by this push:
     new f6f5919  STREAMS-641 implement readAll() in S3PersistReader
     new 61e1d43  Merge pull request #479 from steveblackmon/STREAMS-641
f6f5919 is described below

commit f6f59199a5fd125bde4fe56f9efa955279b04a63
Author: Steve Blackmon <sb...@apache.org>
AuthorDate: Tue Mar 19 09:52:40 2019 -0500

    STREAMS-641 implement readAll() in S3PersistReader
    
    resolves STREAMS-641
---
 .../main/java/org/apache/streams/s3/S3PersistReader.java | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 3a2cf3b..88e9bf9 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.streams.s3;
 
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.converter.LineReadWriteUtil;
 import org.apache.streams.core.DatumStatusCountable;
 import org.apache.streams.core.DatumStatusCounter;
@@ -62,6 +64,8 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
   public static final String STREAMS_ID = "S3PersistReader";
   protected static final char DELIMITER = '\t';
 
+  StreamsConfiguration streamsConfiguration;
+
   private S3ReaderConfiguration s3ReaderConfiguration;
   private AmazonS3Client amazonS3Client;
   private ObjectMapper mapper = new ObjectMapper();
@@ -119,7 +123,10 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
   @Override
   public void prepare(Object configurationObject) {
 
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
+
     lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
+
     // Connect to S3
     synchronized (this) {
       // Create the credentials Object
@@ -183,7 +190,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
       LOGGER.error("There are no files to read");
     }
 
-    this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+    this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getQueueSize().intValue()));
     this.executor = Executors.newSingleThreadExecutor();
   }
 
@@ -192,7 +199,12 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
   }
 
   public StreamsResultSet readAll() {
-    startStream();
+    LOGGER.debug("readAll");
+    Thread thread = new Thread(new S3PersistReaderTask(this));
+    try {
+      thread.start();
+      thread.join(streamsConfiguration.getProviderTimeoutMs());
+    } catch( InterruptedException ie) {}
     return new StreamsResultSet(persistQueue);
   }