You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2019/04/07 18:16:48 UTC
[streams] branch master updated: STREAMS-641 implement readAll() in
S3PersistReader
This is an automated email from the ASF dual-hosted git repository.
sblackmon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/streams.git
The following commit(s) were added to refs/heads/master by this push:
new f6f5919 STREAMS-641 implement readAll() in S3PersistReader
new 61e1d43 Merge pull request #479 from steveblackmon/STREAMS-641
f6f5919 is described below
commit f6f59199a5fd125bde4fe56f9efa955279b04a63
Author: Steve Blackmon <sb...@apache.org>
AuthorDate: Tue Mar 19 09:52:40 2019 -0500
STREAMS-641 implement readAll() in S3PersistReader
resolves STREAMS-641
---
.../main/java/org/apache/streams/s3/S3PersistReader.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 3a2cf3b..88e9bf9 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -18,6 +18,8 @@
package org.apache.streams.s3;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
@@ -62,6 +64,8 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
public static final String STREAMS_ID = "S3PersistReader";
protected static final char DELIMITER = '\t';
+ StreamsConfiguration streamsConfiguration;
+
private S3ReaderConfiguration s3ReaderConfiguration;
private AmazonS3Client amazonS3Client;
private ObjectMapper mapper = new ObjectMapper();
@@ -119,7 +123,10 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
@Override
public void prepare(Object configurationObject) {
+ streamsConfiguration = StreamsConfigurator.detectConfiguration();
+
lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
+
// Connect to S3
synchronized (this) {
// Create the credentials Object
@@ -183,7 +190,7 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
LOGGER.error("There are no files to read");
}
- this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getQueueSize().intValue()));
this.executor = Executors.newSingleThreadExecutor();
}
@@ -192,7 +199,12 @@ public class S3PersistReader implements StreamsPersistReader, DatumStatusCountab
}
public StreamsResultSet readAll() {
- startStream();
+ LOGGER.debug("readAll");
+ Thread thread = new Thread(new S3PersistReaderTask(this));
+ try {
+ thread.start();
+ thread.join(streamsConfiguration.getProviderTimeoutMs());
+ } catch( InterruptedException ie) {}
return new StreamsResultSet(persistQueue);
}