You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:20 UTC
[39/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
index 0a39461..b61a364 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistWriter.java
@@ -16,8 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.amazon.kinesis;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.TypeConverterUtil;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
@@ -30,11 +37,7 @@ import com.amazonaws.services.kinesis.model.PutRecordResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.TypeConverterUtil;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,87 +51,94 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
- * Created by sblackmon on 9/2/15.
+ * KinesisPersistWriter writes documents to kinesis.
*/
public class KinesisPersistWriter implements StreamsPersistWriter {
- public final static String STREAMS_ID = "KinesisPersistWriter";
+ public static final String STREAMS_ID = "KinesisPersistWriter";
- private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(KinesisPersistWriter.class);
- protected volatile Queue<StreamsDatum> persistQueue;
+ protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = new ObjectMapper();
+ private ObjectMapper mapper = new ObjectMapper();
- private KinesisWriterConfiguration config;
+ private KinesisWriterConfiguration config;
- private List<String> streamName;
+ private List<String> streamName;
- private ExecutorService executor;
+ private ExecutorService executor;
- protected AmazonKinesisClient client;
-
- public KinesisPersistWriter() {
- Config config = StreamsConfigurator.config.getConfig("kinesis");
- this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
+ protected AmazonKinesisClient client;
- public KinesisPersistWriter(KinesisWriterConfiguration config) {
- this.config = config;
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
+ /**
+ * KinesisPersistWriter constructor - resolves KinesisWriterConfiguration from JVM 'kinesis'.
+ */
+ public KinesisPersistWriter() {
+ Config config = StreamsConfigurator.config.getConfig("kinesis");
+ this.config = new ComponentConfigurator<>(KinesisWriterConfiguration.class).detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
- public void setConfig(KinesisWriterConfiguration config) {
- this.config = config;
- }
+ /**
+ * KinesisPersistWriter constructor - uses provided KinesisWriterConfiguration.
+ */
+ public KinesisPersistWriter(KinesisWriterConfiguration config) {
+ this.config = config;
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public void setConfig(KinesisWriterConfiguration config) {
+ this.config = config;
+ }
- @Override
- public void write(StreamsDatum entry) {
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
+ @Override
+ public void write(StreamsDatum entry) {
- PutRecordRequest putRecordRequest = new PutRecordRequest()
- .withStreamName(config.getStream())
- .withPartitionKey(entry.getId())
- .withData(ByteBuffer.wrap(document.getBytes()));
+ String document = (String) TypeConverterUtil.getInstance().convert(entry.getDocument(), String.class);
- PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
+ PutRecordRequest putRecordRequest = new PutRecordRequest()
+ .withStreamName(config.getStream())
+ .withPartitionKey(entry.getId())
+ .withData(ByteBuffer.wrap(document.getBytes()));
- entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
+ PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
- LOGGER.debug("Wrote {}", entry);
- }
+ entry.setSequenceid(new BigInteger(putRecordResult.getSequenceNumber()));
- @Override
- public void prepare(Object configurationObject) {
- // Connect to Kinesis
- synchronized (this) {
- // Create the credentials Object
- AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
+ LOGGER.debug("Wrote {}", entry);
+ }
- ClientConfiguration clientConfig = new ClientConfiguration();
- clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+ @Override
+ public void prepare(Object configurationObject) {
+ // Connect to Kinesis
+ synchronized (this) {
+ // Create the credentials Object
+ AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), config.getSecretKey());
- this.client = new AmazonKinesisClient(credentials, clientConfig);
- if (!Strings.isNullOrEmpty(config.getRegion()))
- this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
- }
- executor = Executors.newSingleThreadExecutor();
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+ this.client = new AmazonKinesisClient(credentials, clientConfig);
+ if (!Strings.isNullOrEmpty(config.getRegion())) {
+ this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
+ }
}
+ executor = Executors.newSingleThreadExecutor();
+
+ }
- @Override
- public void cleanUp() {
- try {
- executor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
- LOGGER.debug("Interrupted! ", e);
- }
+ @Override
+ public void cleanUp() {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ex) {
+ LOGGER.debug("Interrupted! ", ex);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
index c13314d..f34782a 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3ObjectInputStreamWrapper.java
@@ -15,10 +15,12 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.s3;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,125 +36,129 @@ import java.io.InputStream;
* and transfer the entire file. If you are only reading the first 50 lines of a 5,000,000 line file
* this becomes problematic.
*
+ * <p/>
* This class operates as a wrapper to fix the aforementioned nuances.
*
+ * <p/>
* Reference:
* http://stackoverflow.com/questions/17782937/connectionpooltimeoutexception-when-iterating-objects-in-s3
*/
-public class S3ObjectInputStreamWrapper extends InputStream
-{
- private final static Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class);
-
- private final S3Object s3Object;
- private final S3ObjectInputStream is;
- private boolean isClosed = false;
-
- /**
- * Create an input stream safely from
- * @param s3Object
- */
- public S3ObjectInputStreamWrapper(S3Object s3Object) {
- this.s3Object = s3Object;
- this.is = this.s3Object.getObjectContent();
- }
-
- public int hashCode() {
- return this.is.hashCode();
- }
-
- public boolean equals(Object obj) {
- return this.is.equals(obj);
- }
-
- public String toString() {
- return this.is.toString();
- }
-
- public int read() throws IOException {
- return this.is.read();
- }
-
- public int read(byte[] b) throws IOException {
- return this.is.read(b);
- }
-
- public int read(byte[] b, int off, int len) throws IOException {
- return this.is.read(b, off, len);
- }
-
- public long skip(long n) throws IOException {
- return this.is.skip(n);
- }
-
- public int available() throws IOException {
- return this.is.available();
- }
-
- public boolean markSupported() {
- return this.is.markSupported();
- }
-
- public synchronized void mark(int readlimit) {
- this.is.mark(readlimit);
- }
-
- public synchronized void reset() throws IOException {
- this.is.reset();
- }
-
- public void close() throws IOException {
- ensureEverythingIsReleased();
- }
+public class S3ObjectInputStreamWrapper extends InputStream {
- public void ensureEverythingIsReleased() {
- if(this.isClosed)
- return;
-
-
- try {
- // ensure that the S3 Object is closed properly.
- this.s3Object.close();
- } catch(Throwable e) {
- LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
- }
-
-
- try {
- // Abort the stream
- this.is.abort();
- }
- catch(Throwable e) {
- LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), e.getMessage());
- }
-
- // close the input Stream Safely
- closeSafely(this.is);
-
- // This corrects the issue with Open HTTP connections
- closeSafely(this.s3Object);
- this.isClosed = true;
- }
-
- private static void closeSafely(Closeable is) {
- try {
- if(is != null)
- is.close();
- } catch(Exception e) {
- e.printStackTrace();
- LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", e.getMessage());
- }
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3ObjectInputStreamWrapper.class);
+
+ private final S3Object s3Object;
+ private final S3ObjectInputStream is;
+ private boolean isClosed = false;
- protected void finalize( ) throws Throwable
- {
- try {
- // If there is an accidental leak where the user did not close, call this on the classes destructor
- ensureEverythingIsReleased();
- super.finalize();
- } catch(Exception e) {
- // this should never be called, just being very cautious
- LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", e.getMessage());
- }
+ /**
+ * Create an input stream safely.
+ * @param s3Object s3Object
+ */
+ public S3ObjectInputStreamWrapper(S3Object s3Object) {
+ this.s3Object = s3Object;
+ this.is = this.s3Object.getObjectContent();
+ }
+
+ public int hashCode() {
+ return this.is.hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ return this.is.equals(obj);
+ }
+
+ public String toString() {
+ return this.is.toString();
+ }
+
+ public int read() throws IOException {
+ return this.is.read();
+ }
+
+ public int read(byte[] byt) throws IOException {
+ return this.is.read(byt);
+ }
+
+ public int read(byte[] byt, int off, int len) throws IOException {
+ return this.is.read(byt, off, len);
+ }
+
+ public long skip(long skip) throws IOException {
+ return this.is.skip(skip);
+ }
+
+ public int available() throws IOException {
+ return this.is.available();
+ }
+
+ public boolean markSupported() {
+ return this.is.markSupported();
+ }
+
+ public synchronized void mark(int readlimit) {
+ this.is.mark(readlimit);
+ }
+
+ public synchronized void reset() throws IOException {
+ this.is.reset();
+ }
+
+ public void close() throws IOException {
+ ensureEverythingIsReleased();
+ }
+
+ /**
+ * ensureEverythingIsReleased as part of close process.
+ */
+ public void ensureEverythingIsReleased() {
+ if (this.isClosed) {
+ return;
+ }
+
+ try {
+ // ensure that the S3 Object is closed properly.
+ this.s3Object.close();
+ } catch (Throwable ex) {
+ LOGGER.warn("Problem Closing the S3Object[{}]: {}", s3Object.getKey(), ex.getMessage());
+ }
+
+
+ try {
+ // Abort the stream
+ this.is.abort();
+ } catch (Throwable ex) {
+ LOGGER.warn("Problem Aborting S3Object[{}]: {}", s3Object.getKey(), ex.getMessage());
+ }
+
+ // close the input Stream Safely
+ closeSafely(this.is);
+
+ // This corrects the issue with Open HTTP connections
+ closeSafely(this.s3Object);
+ this.isClosed = true;
+ }
+
+ private static void closeSafely(Closeable is) {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ LOGGER.warn("S3InputStreamWrapper: Issue Closing Closeable - {}", ex.getMessage());
+ }
+ }
+
+ protected void finalize() throws Throwable {
+ try {
+ // If there is an accidental leak where the user did not close, call this on the classes destructor
+ ensureEverythingIsReleased();
+ super.finalize();
+ } catch (Exception ex) {
+ // this should never be called, just being very cautious
+ LOGGER.warn("S3InputStreamWrapper: Issue Releasing Connections on Finalize - {}", ex.getMessage());
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
index 08fc774..e8ca0c7 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3OutputStreamWrapper.java
@@ -15,17 +15,23 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.s3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.*;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.Map;
/**
@@ -33,112 +39,109 @@ import java.util.Map;
* in memory ByteArrayOutPutStream before it is finally written to Amazon S3. The size the file is allowed to become
* is directly controlled by the S3PersistWriter.
*/
-public class S3OutputStreamWrapper extends OutputStream
-{
- private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class);
-
- private final AmazonS3Client amazonS3Client;
- private final String bucketName;
- private final String path;
- private final String fileName;
- private ByteArrayOutputStream outputStream;
- private final Map<String, String> metaData;
- private boolean isClosed = false;
-
- /**
- * Create an OutputStream Wrapper
- * @param amazonS3Client
- * The Amazon S3 Client which will be handling the file
- * @param bucketName
- * The Bucket Name you are wishing to write to.
- * @param path
- * The path where the object will live
- * @param fileName
- * The fileName you ware wishing to write.
- * @param metaData
- * Any meta data that is to be written along with the object
- * @throws IOException
- * If there is an issue creating the stream, this
- */
- public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException {
- this.amazonS3Client = amazonS3Client;
- this.bucketName = bucketName;
- this.path = path;
- this.fileName = fileName;
- this.metaData = metaData;
- this.outputStream = new ByteArrayOutputStream();
+public class S3OutputStreamWrapper extends OutputStream {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3OutputStreamWrapper.class);
+
+ private final AmazonS3Client amazonS3Client;
+ private final String bucketName;
+ private final String path;
+ private final String fileName;
+ private ByteArrayOutputStream outputStream;
+ private final Map<String, String> metaData;
+ private boolean isClosed = false;
+
+ /**
+ * Create an OutputStream Wrapper
+ * @param amazonS3Client
+ * The Amazon S3 Client which will be handling the file
+ * @param bucketName
+ * The Bucket Name you are wishing to write to.
+ * @param path
+ * The path where the object will live
+ * @param fileName
+ * The fileName you ware wishing to write.
+ * @param metaData
+ * Any meta data that is to be written along with the object
+ * @throws IOException
+ * If there is an issue creating the stream, this
+ */
+ public S3OutputStreamWrapper(AmazonS3Client amazonS3Client, String bucketName, String path, String fileName, Map<String, String> metaData) throws IOException {
+ this.amazonS3Client = amazonS3Client;
+ this.bucketName = bucketName;
+ this.path = path;
+ this.fileName = fileName;
+ this.metaData = metaData;
+ this.outputStream = new ByteArrayOutputStream();
+ }
+
+ public void write(int byt) throws IOException {
+ this.outputStream.write(byt);
+ }
+
+ public void write(byte[] byt) throws IOException {
+ this.outputStream.write(byt);
+ }
+
+ public void write(byte[] byt, int off, int len) throws IOException {
+ this.outputStream.write(byt, off, len);
+ }
+
+ public void flush() throws IOException {
+ this.outputStream.flush();
+ }
+
+ /**
+ * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.
+ * @throws IOException
+ * Exception thrown from the FileOutputStream
+ */
+ public void close() throws IOException {
+ if (!isClosed) {
+ try {
+ this.addFile();
+ this.outputStream.close();
+ this.outputStream = null;
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ LOGGER.warn("There was an error adding the temporaryFile to S3");
+ } finally {
+ // we are done here.
+ this.isClosed = true;
+ }
}
+ }
- public void write(int b) throws IOException {
- this.outputStream.write(b);
- }
+ private void addFile() throws Exception {
- public void write(byte[] b) throws IOException {
- this.outputStream.write(b);
- }
+ InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray());
+ int contentLength = outputStream.size();
- public void write(byte[] b, int off, int len) throws IOException {
- this.outputStream.write(b, off, len);
- }
+ TransferManager transferManager = new TransferManager(amazonS3Client);
+ ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setExpirationTime(DateTime.now().plusDays(365 * 3).toDate());
+ metadata.setContentLength(contentLength);
- public void flush() throws IOException {
- this.outputStream.flush();
- }
+ metadata.addUserMetadata("writer", "org.apache.streams");
- /**
- * Whenever the output stream is closed we are going to kick the ByteArrayOutputStream off to Amazon S3.
- * @throws IOException
- * Exception thrown from the FileOutputStream
- */
- public void close() throws IOException {
- if(!isClosed)
- {
- try
- {
- this.addFile();
- this.outputStream.close();
- this.outputStream = null;
- }
- catch(Exception e) {
- e.printStackTrace();
- LOGGER.warn("There was an error adding the temporaryFile to S3");
- }
- finally {
- // we are done here.
- this.isClosed = true;
- }
- }
+ for (String s : metaData.keySet()) {
+ metadata.addUserMetadata(s, metaData.get(s));
}
- private void addFile() throws Exception {
-
- InputStream is = new ByteArrayInputStream(this.outputStream.toByteArray());
- int contentLength = outputStream.size();
+ String fileNameToWrite = path + fileName;
+ Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata);
+ try {
+ upload.waitForUploadResult();
- TransferManager transferManager = new TransferManager(amazonS3Client);
- ObjectMetadata metadata = new ObjectMetadata();
- metadata.setExpirationTime(DateTime.now().plusDays(365*3).toDate());
- metadata.setContentLength(contentLength);
-
- metadata.addUserMetadata("writer", "org.apache.streams");
-
- for(String s : metaData.keySet())
- metadata.addUserMetadata(s, metaData.get(s));
-
- String fileNameToWrite = path + fileName;
- Upload upload = transferManager.upload(bucketName, fileNameToWrite, is, metadata);
- try {
- upload.waitForUploadResult();
-
- is.close();
- transferManager.shutdownNow(false);
- LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName);
- } catch (Exception e) {
- // No Op
- }
+ is.close();
+ transferManager.shutdownNow(false);
+ LOGGER.info("S3 File Close[{} kb] - {}", contentLength / 1024, path + fileName);
+ } catch (Exception ignored) {
+ LOGGER.trace("Ignoring", ignored);
+ }
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
index 702df71..753b439 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReader.java
@@ -15,8 +15,16 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.s3;
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
@@ -31,12 +39,7 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
-import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,163 +53,168 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+/**
+ * S3PersistReader reads documents from s3.
+ */
public class S3PersistReader implements StreamsPersistReader, DatumStatusCountable {
- private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
- public final static String STREAMS_ID = "S3PersistReader";
- protected final static char DELIMITER = '\t';
-
- private S3ReaderConfiguration s3ReaderConfiguration;
- private AmazonS3Client amazonS3Client;
- private ObjectMapper mapper = new ObjectMapper();
- protected LineReadWriteUtil lineReaderUtil;
- private Collection<String> files;
- private ExecutorService executor;
- protected volatile Queue<StreamsDatum> persistQueue;
-
- protected DatumStatusCounter countersTotal = new DatumStatusCounter();
- protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
- private Future<?> task;
-
- public AmazonS3Client getAmazonS3Client() {
- return this.amazonS3Client;
- }
-
- public S3ReaderConfiguration getS3ReaderConfiguration() {
- return this.s3ReaderConfiguration;
- }
-
- public String getBucketName() {
- return this.s3ReaderConfiguration.getBucket();
- }
-
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return !task.isDone() && !task.isCancelled();
- }
-
- public DatumStatusCounter getDatumStatusCounter() {
- return countersTotal;
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReader.class);
+ public static final String STREAMS_ID = "S3PersistReader";
+ protected static final char DELIMITER = '\t';
+
+ private S3ReaderConfiguration s3ReaderConfiguration;
+ private AmazonS3Client amazonS3Client;
+ private ObjectMapper mapper = new ObjectMapper();
+ protected LineReadWriteUtil lineReaderUtil;
+ private Collection<String> files;
+ private ExecutorService executor;
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+ protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private Future<?> task;
+
+ public AmazonS3Client getAmazonS3Client() {
+ return this.amazonS3Client;
+ }
+
+ public S3ReaderConfiguration getS3ReaderConfiguration() {
+ return this.s3ReaderConfiguration;
+ }
+
+ public String getBucketName() {
+ return this.s3ReaderConfiguration.getBucket();
+ }
+
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !task.isDone() && !task.isCancelled();
+ }
+
+ public DatumStatusCounter getDatumStatusCounter() {
+ return countersTotal;
+ }
+
+ public Collection<String> getFiles() {
+ return this.files;
+ }
+
+ public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
+ this.s3ReaderConfiguration = s3ReaderConfiguration;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
+ // Connect to S3
+ synchronized (this) {
+ // Create the credentials Object
+ AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
+
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));
+
+ // We do not want path style access
+ S3ClientOptions clientOptions = new S3ClientOptions();
+ clientOptions.setPathStyleAccess(false);
+
+ this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+ if ( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion())) {
+ this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
+ }
+ this.amazonS3Client.setS3ClientOptions(clientOptions);
}
- public Collection<String> getFiles() {
- return this.files;
- }
+ final ListObjectsRequest request = new ListObjectsRequest()
+ .withBucketName(this.s3ReaderConfiguration.getBucket())
+ .withPrefix(s3ReaderConfiguration.getReaderPath())
+ .withMaxKeys(500);
+
+
+ ObjectListing listing = this.amazonS3Client.listObjects(request);
+
+ this.files = new ArrayList<String>();
+
+ /**
+ * If you can list files that are in this path, then you must be dealing with a directory
+ * if you cannot list files that are in this path, then you are most likely dealing with
+ * a simple file.
+ */
+ boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true : false;
+ boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? true : false;
+
+ if (hasCommonPrefixes || hasObjectSummaries) {
+ // Handle the 'directory' use case
+ do {
+ if (hasCommonPrefixes) {
+ for (String file : listing.getCommonPrefixes()) {
+ this.files.add(file);
+ }
+ } else {
+ for (final S3ObjectSummary objectSummary : listing.getObjectSummaries()) {
+ this.files.add(objectSummary.getKey());
+ }
+ }
- public S3PersistReader(S3ReaderConfiguration s3ReaderConfiguration) {
- this.s3ReaderConfiguration = s3ReaderConfiguration;
+ // get the next batch.
+ listing = this.amazonS3Client.listNextBatchOfObjects(listing);
+ }
+ while (listing.isTruncated());
+ } else {
+ // handle the single file use-case
+ this.files.add(s3ReaderConfiguration.getReaderPath());
}
- @Override
- public String getId() {
- return STREAMS_ID;
+ if (this.files.size() <= 0) {
+ LOGGER.error("There are no files to read");
}
- public void prepare(Object configurationObject) {
-
- lineReaderUtil = LineReadWriteUtil.getInstance(s3ReaderConfiguration);
- // Connect to S3
- synchronized (this)
- {
- // Create the credentials Object
- AWSCredentials credentials = new BasicAWSCredentials(s3ReaderConfiguration.getKey(), s3ReaderConfiguration.getSecretKey());
-
- ClientConfiguration clientConfig = new ClientConfiguration();
- clientConfig.setProtocol(Protocol.valueOf(s3ReaderConfiguration.getProtocol().toString()));
-
- // We do not want path style access
- S3ClientOptions clientOptions = new S3ClientOptions();
- clientOptions.setPathStyleAccess(false);
+ this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ this.executor = Executors.newSingleThreadExecutor();
+ }
- this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
- if( !Strings.isNullOrEmpty(s3ReaderConfiguration.getRegion()))
- this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3ReaderConfiguration.getRegion())));
- this.amazonS3Client.setS3ClientOptions(clientOptions);
- }
-
- final ListObjectsRequest request = new ListObjectsRequest()
- .withBucketName(this.s3ReaderConfiguration.getBucket())
- .withPrefix(s3ReaderConfiguration.getReaderPath())
- .withMaxKeys(500);
-
-
- ObjectListing listing = this.amazonS3Client.listObjects(request);
-
- this.files = new ArrayList<String>();
-
- /**
- * If you can list files that are in this path, then you must be dealing with a directory
- * if you cannot list files that are in this path, then you are most likely dealing with
- * a simple file.
- */
- boolean hasCommonPrefixes = listing.getCommonPrefixes().size() > 0 ? true : false;
- boolean hasObjectSummaries = listing.getObjectSummaries().size() > 0 ? true : false;
-
- if(hasCommonPrefixes || hasObjectSummaries) {
- // Handle the 'directory' use case
- do
- {
- if(hasCommonPrefixes) {
- for (String file : listing.getCommonPrefixes()) {
- this.files.add(file);
- }
- } else {
- for(final S3ObjectSummary objectSummary : listing.getObjectSummaries()) {
- this.files.add(objectSummary.getKey());
- }
- }
-
- // get the next batch.
- listing = this.amazonS3Client.listNextBatchOfObjects(listing);
- } while (listing.isTruncated());
- }
- else {
- // handle the single file use-case
- this.files.add(s3ReaderConfiguration.getReaderPath());
- }
-
- if(this.files.size() <= 0)
- LOGGER.error("There are no files to read");
-
- this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
- this.executor = Executors.newSingleThreadExecutor();
- }
-
- public void cleanUp() {
- // no Op
- }
+ public void cleanUp() {
+ // no Op
+ }
- public StreamsResultSet readAll() {
- startStream();
- return new StreamsResultSet(persistQueue);
- }
+ public StreamsResultSet readAll() {
+ startStream();
+ return new StreamsResultSet(persistQueue);
+ }
- public void startStream() {
- LOGGER.debug("startStream");
- task = executor.submit(new S3PersistReaderTask(this));
- }
+ public void startStream() {
+ LOGGER.debug("startStream");
+ task = executor.submit(new S3PersistReaderTask(this));
+ }
- public StreamsResultSet readCurrent() {
+ @Override
+ public StreamsResultSet readCurrent() {
- StreamsResultSet current;
+ StreamsResultSet current;
- synchronized( S3PersistReader.class ) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
- current.setCounter(new DatumStatusCounter());
- current.getCounter().add(countersCurrent);
- countersTotal.add(countersCurrent);
- countersCurrent = new DatumStatusCounter();
- persistQueue.clear();
- }
- return current;
+ synchronized ( S3PersistReader.class ) {
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+ current.setCounter(new DatumStatusCounter());
+ current.getCounter().add(countersCurrent);
+ countersTotal.add(countersCurrent);
+ countersCurrent = new DatumStatusCounter();
+ persistQueue.clear();
}
+ return current;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index f2f5567..f0e9626 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -15,12 +15,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.s3;
-import com.google.common.base.Strings;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.util.ComponentUtils;
+
+import com.google.common.base.Strings;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,57 +31,61 @@ import java.io.BufferedReader;
import java.io.Closeable;
import java.io.InputStreamReader;
+/**
+ * S3PersistReaderTask reads documents from s3 on behalf of
+ * @see org.apache.streams.s3.S3PersistReader
+ */
public class S3PersistReaderTask implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistReaderTask.class);
- private S3PersistReader reader;
+ private S3PersistReader reader;
- public S3PersistReaderTask(S3PersistReader reader) {
- this.reader = reader;
- }
+ public S3PersistReaderTask(S3PersistReader reader) {
+ this.reader = reader;
+ }
- @Override
- public void run() {
-
- for(String file : reader.getFiles()) {
-
- // Create our buffered reader
- S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
- LOGGER.info("Reading: {} ", file);
-
- String line = "";
- try {
- while((line = bufferedReader.readLine()) != null) {
- if( !Strings.isNullOrEmpty(line) ) {
- reader.countersCurrent.incrementAttempt();
- StreamsDatum entry = reader.lineReaderUtil.processLine(line);
- ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
- reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- LOGGER.warn(e.getMessage());
- reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
- }
-
- LOGGER.info("Completed: " + file);
-
- try {
- closeSafely(file, is);
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- }
+ @Override
+ public void run() {
+
+ for (String file : reader.getFiles()) {
+
+ // Create our buffered reader
+ S3ObjectInputStreamWrapper is = new S3ObjectInputStreamWrapper(reader.getAmazonS3Client().getObject(reader.getBucketName(), file));
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));
+ LOGGER.info("Reading: {} ", file);
+
+ String line = "";
+ try {
+ while ((line = bufferedReader.readLine()) != null) {
+ if ( !Strings.isNullOrEmpty(line) ) {
+ reader.countersCurrent.incrementAttempt();
+ StreamsDatum entry = reader.lineReaderUtil.processLine(line);
+ ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
+ reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+ }
}
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ LOGGER.warn(ex.getMessage());
+ reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
+ }
+
+ LOGGER.info("Completed: " + file);
+
+ try {
+ closeSafely(file, is);
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
}
+ }
- private static void closeSafely(String file, Closeable closeable) {
- try {
- closeable.close();
- } catch(Exception e) {
- LOGGER.error("There was an issue closing file: {}", file);
- }
+ private static void closeSafely(String file, Closeable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception ex) {
+ LOGGER.error("There was an issue closing file: {}", file);
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 3686f55..ef6e831 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -15,8 +15,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.s3;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.converter.LineReadWriteUtil;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
@@ -28,15 +39,7 @@ import com.amazonaws.services.s3.S3ClientOptions;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.DatumStatusCountable;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,239 +56,256 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable
-{
- public final static String STREAMS_ID = "S3PersistWriter";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class);
+/**
+ * S3PersistWriter writes documents to s3.
+ */
+public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountable {
- private final static char DELIMITER = '\t';
+ public static final String STREAMS_ID = "S3PersistWriter";
- private ObjectMapper objectMapper;
- private AmazonS3Client amazonS3Client;
- private S3WriterConfiguration s3WriterConfiguration;
- private final List<String> writtenFiles = new ArrayList<String>();
- protected LineReadWriteUtil lineWriterUtil;
+ private static final Logger LOGGER = LoggerFactory.getLogger(S3PersistWriter.class);
- private final AtomicLong totalBytesWritten = new AtomicLong();
- private AtomicLong bytesWrittenThisFile = new AtomicLong();
+ private static final char DELIMITER = '\t';
- private final AtomicInteger totalRecordsWritten = new AtomicInteger();
- private AtomicInteger fileLineCounter = new AtomicInteger();
+ private ObjectMapper objectMapper;
+ private AmazonS3Client amazonS3Client;
+ private S3WriterConfiguration s3WriterConfiguration;
+ private final List<String> writtenFiles = new ArrayList<String>();
+ protected LineReadWriteUtil lineWriterUtil;
- private Map<String, String> objectMetaData = new HashMap<String, String>() {{
- put("line[0]", "id");
- put("line[1]", "timeStamp");
- put("line[2]", "metaData");
- put("line[3]", "document");
- }};
+ private final AtomicLong totalBytesWritten = new AtomicLong();
+ private AtomicLong bytesWrittenThisFile = new AtomicLong();
- private OutputStreamWriter currentWriter = null;
+ private final AtomicInteger totalRecordsWritten = new AtomicInteger();
+ private AtomicInteger fileLineCounter = new AtomicInteger();
- public AmazonS3Client getAmazonS3Client() {
- return this.amazonS3Client;
- }
+ private static Map<String, String> objectMetaData = new HashMap<String, String>();
- public S3WriterConfiguration getS3WriterConfiguration() {
- return this.s3WriterConfiguration;
- }
+ static {
+ objectMetaData.put("line[0]", "id");
+ objectMetaData.put("line[1]", "timeStamp");
+ objectMetaData.put("line[2]", "metaData");
+ objectMetaData.put("line[3]", "document");
+ }
- public List<String> getWrittenFiles() {
- return this.writtenFiles;
- }
+ private OutputStreamWriter currentWriter = null;
- public Map<String, String> getObjectMetaData() {
- return this.objectMetaData;
- }
+ public AmazonS3Client getAmazonS3Client() {
+ return this.amazonS3Client;
+ }
- public ObjectMapper getObjectMapper() {
- return this.objectMapper;
- }
+ public S3WriterConfiguration getS3WriterConfiguration() {
+ return this.s3WriterConfiguration;
+ }
- public void setObjectMapper(ObjectMapper mapper) {
- this.objectMapper = mapper;
- }
+ public List<String> getWrittenFiles() {
+ return this.writtenFiles;
+ }
- public void setObjectMetaData(Map<String, String> val) {
- this.objectMetaData = val;
- }
+ public Map<String, String> getObjectMetaData() {
+ return this.objectMetaData;
+ }
- public S3PersistWriter() {
- this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3")));
- }
+ public ObjectMapper getObjectMapper() {
+ return this.objectMapper;
+ }
- public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
- this.s3WriterConfiguration = s3WriterConfiguration;
- }
+ public void setObjectMapper(ObjectMapper mapper) {
+ this.objectMapper = mapper;
+ }
- /**
- * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
- * @param amazonS3Client
- * If you have an existing amazonS3Client, it wont' bother to create another one
- * @param s3WriterConfiguration
- * Configuration of the write paths and instructions are still required.
- */
- public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) {
- this.amazonS3Client = amazonS3Client;
- this.s3WriterConfiguration = s3WriterConfiguration;
- }
+ public void setObjectMetaData(Map<String, String> val) {
+ this.objectMetaData = val;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public S3PersistWriter() {
+ this(new ComponentConfigurator<>(S3WriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("s3")));
+ }
- @Override
- public void write(StreamsDatum streamsDatum) {
-
- synchronized (this) {
- // Check to see if we need to reset the file that we are currently working with
- if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) {
- try {
- LOGGER.info("Resetting the file");
- this.currentWriter = resetFile();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- String line = lineWriterUtil.convertResultToString(streamsDatum);
-
- try {
- this.currentWriter.write(line);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- // add the bytes we've written
- int recordSize = line.getBytes().length;
- this.totalBytesWritten.addAndGet(recordSize);
- this.bytesWrittenThisFile.addAndGet(recordSize);
-
- // increment the record count
- this.totalRecordsWritten.incrementAndGet();
- this.fileLineCounter.incrementAndGet();
- }
+ public S3PersistWriter(S3WriterConfiguration s3WriterConfiguration) {
+ this.s3WriterConfiguration = s3WriterConfiguration;
+ }
- }
+ /**
+ * Instantiator with a pre-existing amazonS3Client, this is used to help with re-use.
+ * @param amazonS3Client
+ * If you have an existing amazonS3Client, it wont' bother to create another one
+ * @param s3WriterConfiguration
+ * Configuration of the write paths and instructions are still required.
+ */
+ public S3PersistWriter(AmazonS3Client amazonS3Client, S3WriterConfiguration s3WriterConfiguration) {
+ this.amazonS3Client = amazonS3Client;
+ this.s3WriterConfiguration = s3WriterConfiguration;
+ }
- public synchronized OutputStreamWriter resetFile() throws Exception {
- // this will keep it thread safe, so we don't create too many files
- if(this.fileLineCounter.get() == 0 && this.currentWriter != null)
- return this.currentWriter;
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- closeAndDestroyWriter();
+ @Override
+ public void write(StreamsDatum streamsDatum) {
- // Create the path for where the file is going to live.
+ synchronized (this) {
+ // Check to see if we need to reset the file that we are currently working with
+ if (this.currentWriter == null || ( this.bytesWrittenThisFile.get() >= (this.s3WriterConfiguration.getMaxFileSize() * 1024 * 1024))) {
try {
- // generate a file name
- String fileName = this.s3WriterConfiguration.getWriterFilePrefix() +
- (this.s3WriterConfiguration.getChunk() ? "/" : "-") + new Date().getTime() + ".tsv";
-
- // create the output stream
- OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client,
- this.s3WriterConfiguration.getBucket(),
- this.s3WriterConfiguration.getWriterPath(),
- fileName,
- this.objectMetaData);
-
- // reset the counter
- this.fileLineCounter = new AtomicInteger();
- this.bytesWrittenThisFile = new AtomicLong();
-
- // add this to the list of written files
- writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName);
-
- // Log that we are creating this file
- LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName);
-
- // return the output stream
- return new OutputStreamWriter(outputStream);
- } catch (Exception e) {
- LOGGER.error(e.getMessage());
- throw e;
+ LOGGER.info("Resetting the file");
+ this.currentWriter = resetFile();
+ } catch (Exception ex) {
+ ex.printStackTrace();
}
- }
+ }
- private synchronized void closeAndDestroyWriter() {
- // if there is a current writer, we must close it first.
- if (this.currentWriter != null) {
- this.safeFlush(this.currentWriter);
- this.closeSafely(this.currentWriter);
- this.currentWriter = null;
+ String line = lineWriterUtil.convertResultToString(streamsDatum);
- // Logging of information to alert the user to the activities of this class
- LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size()-1));
- }
+ try {
+ this.currentWriter.write(line);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ }
+
+ // add the bytes we've written
+ int recordSize = line.getBytes().length;
+ this.totalBytesWritten.addAndGet(recordSize);
+ this.bytesWrittenThisFile.addAndGet(recordSize);
+
+ // increment the record count
+ this.totalRecordsWritten.incrementAndGet();
+ this.fileLineCounter.incrementAndGet();
}
- private synchronized void closeSafely(Writer writer) {
- if(writer != null) {
- try {
- writer.flush();
- writer.close();
- } catch(Exception e) {
- // noOp
- }
- LOGGER.debug("File Closed");
- }
+ }
+
+ /**
+ * Reset File when it's time to create a new file.
+ * @return OutputStreamWriter
+ * @throws Exception Exception
+ */
+ public synchronized OutputStreamWriter resetFile() throws Exception {
+ // this will keep it thread safe, so we don't create too many files
+ if (this.fileLineCounter.get() == 0 && this.currentWriter != null) {
+ return this.currentWriter;
}
- private void safeFlush(Flushable flushable) {
- // This is wrapped with a ByteArrayOutputStream, so this is really safe.
- if(flushable != null) {
- try {
- flushable.flush();
- } catch(IOException e) {
- // noOp
- }
- }
+ closeAndDestroyWriter();
+
+ // Create the path for where the file is going to live.
+ try {
+ // generate a file name
+ String fileName = this.s3WriterConfiguration.getWriterFilePrefix()
+ + (this.s3WriterConfiguration.getChunk() ? "/" : "-")
+ + new Date().getTime()
+ + ".tsv";
+
+ // create the output stream
+ OutputStream outputStream = new S3OutputStreamWrapper(this.amazonS3Client,
+ this.s3WriterConfiguration.getBucket(),
+ this.s3WriterConfiguration.getWriterPath(),
+ fileName,
+ this.objectMetaData);
+
+ // reset the counter
+ this.fileLineCounter = new AtomicInteger();
+ this.bytesWrittenThisFile = new AtomicLong();
+
+ // add this to the list of written files
+ writtenFiles.add(this.s3WriterConfiguration.getWriterPath() + fileName);
+
+ // Log that we are creating this file
+ LOGGER.info("File Created: Bucket[{}] - {}", this.s3WriterConfiguration.getBucket(), this.s3WriterConfiguration.getWriterPath() + fileName);
+
+ // return the output stream
+ return new OutputStreamWriter(outputStream);
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ throw ex;
}
+ }
+
+ private synchronized void closeAndDestroyWriter() {
+ // if there is a current writer, we must close it first.
+ if (this.currentWriter != null) {
+ this.safeFlush(this.currentWriter);
+ this.closeSafely(this.currentWriter);
+ this.currentWriter = null;
- public void prepare(Object configurationObject) {
+ // Logging of information to alert the user to the activities of this class
+ LOGGER.debug("File Closed: Records[{}] Bytes[{}] {} ", this.fileLineCounter.get(), this.bytesWrittenThisFile.get(), this.writtenFiles.get(this.writtenFiles.size() - 1));
+ }
+ }
+
+ private synchronized void closeSafely(Writer writer) {
+ if (writer != null) {
+ try {
+ writer.flush();
+ writer.close();
+ } catch (Exception ex) {
+ LOGGER.trace("closeSafely", ex);
+ }
+ LOGGER.debug("File Closed");
+ }
+ }
+
+ private void safeFlush(Flushable flushable) {
+ // This is wrapped with a ByteArrayOutputStream, so this is really safe.
+ if (flushable != null) {
+ try {
+ flushable.flush();
+ } catch (IOException ex) {
+ LOGGER.trace("safeFlush", ex);
+ }
+ }
+ }
- lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration);
+ @Override
+ public void prepare(Object configurationObject) {
- // Connect to S3
- synchronized (this) {
+ lineWriterUtil = LineReadWriteUtil.getInstance(s3WriterConfiguration);
- try {
- // if the user has chosen to not set the object mapper, then set a default object mapper for them.
- if (this.objectMapper == null)
- this.objectMapper = StreamsJacksonMapper.getInstance();
+ // Connect to S3
+ synchronized (this) {
- // Create the credentials Object
- if (this.amazonS3Client == null) {
- AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
+ try {
+ // if the user has chosen to not set the object mapper, then set a default object mapper for them.
+ if (this.objectMapper == null) {
+ this.objectMapper = StreamsJacksonMapper.getInstance();
+ }
- ClientConfiguration clientConfig = new ClientConfiguration();
- clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
+ // Create the credentials Object
+ if (this.amazonS3Client == null) {
+ AWSCredentials credentials = new BasicAWSCredentials(s3WriterConfiguration.getKey(), s3WriterConfiguration.getSecretKey());
- // We do not want path style access
- S3ClientOptions clientOptions = new S3ClientOptions();
- clientOptions.setPathStyleAccess(false);
+ ClientConfiguration clientConfig = new ClientConfiguration();
+ clientConfig.setProtocol(Protocol.valueOf(s3WriterConfiguration.getProtocol().toString()));
- this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
- if (!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion()))
- this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
- this.amazonS3Client.setS3ClientOptions(clientOptions);
- }
- } catch (Exception e) {
- LOGGER.error("Exception while preparing the S3 client: {}", e);
- }
+ // We do not want path style access
+ S3ClientOptions clientOptions = new S3ClientOptions();
+ clientOptions.setPathStyleAccess(false);
- Preconditions.checkArgument(this.amazonS3Client != null);
+ this.amazonS3Client = new AmazonS3Client(credentials, clientConfig);
+ if (!Strings.isNullOrEmpty(s3WriterConfiguration.getRegion())) {
+ this.amazonS3Client.setRegion(Region.getRegion(Regions.fromName(s3WriterConfiguration.getRegion())));
+ }
+ this.amazonS3Client.setS3ClientOptions(clientOptions);
}
- }
-
- public void cleanUp() {
- closeAndDestroyWriter();
- }
+ } catch (Exception ex) {
+ LOGGER.error("Exception while preparing the S3 client: {}", ex);
+ }
- public DatumStatusCounter getDatumStatusCounter() {
- DatumStatusCounter counters = new DatumStatusCounter();
- counters.incrementAttempt(this.totalRecordsWritten.get());
- counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get());
- return counters;
+ Preconditions.checkArgument(this.amazonS3Client != null);
}
+ }
+
+ public void cleanUp() {
+ closeAndDestroyWriter();
+ }
+
+ @Override
+ public DatumStatusCounter getDatumStatusCounter() {
+ DatumStatusCounter counters = new DatumStatusCounter();
+ counters.incrementAttempt(this.totalRecordsWritten.get());
+ counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten.get());
+ return counters;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
index 8793333..43d9e34 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistReader.java
@@ -18,102 +18,100 @@
package org.apache.streams.console;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.core.StreamsResultSet;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.BufferedInputStream;
import java.io.InputStream;
-import java.io.PrintStream;
import java.math.BigInteger;
import java.util.Queue;
import java.util.Scanner;
import java.util.concurrent.ConcurrentLinkedQueue;
+/**
+ * ConsolePersistReader reads documents from stdin.
+ */
public class ConsolePersistReader implements StreamsPersistReader {
- private final static String STREAMS_ID = "ConsolePersistReader";
+ private static final String STREAMS_ID = "ConsolePersistReader";
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistReader.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistReader.class);
- protected volatile Queue<StreamsDatum> persistQueue;
+ protected volatile Queue<StreamsDatum> persistQueue;
- protected InputStream inputStream = System.in;
+ protected InputStream inputStream = System.in;
- public ConsolePersistReader() {
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
+ public ConsolePersistReader() {
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
- public ConsolePersistReader(InputStream inputStream) {
- this();
- this.inputStream = inputStream;
- }
+ public ConsolePersistReader(InputStream inputStream) {
+ this();
+ this.inputStream = inputStream;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- public void prepare(Object o) {
+ public void prepare(Object configuration) {
- }
+ }
- public void cleanUp() {
+ public void cleanUp() {
- }
+ }
- @Override
- public void startStream() {
- // no op
- }
+ @Override
+ public void startStream() {
+ // no op
+ }
- @Override
- public StreamsResultSet readAll() {
- return readCurrent();
- }
+ @Override
+ public StreamsResultSet readAll() {
+ return readCurrent();
+ }
- @Override
- public StreamsResultSet readCurrent() {
+ @Override
+ public StreamsResultSet readCurrent() {
- LOGGER.info("{} readCurrent", STREAMS_ID);
+ LOGGER.info("{} readCurrent", STREAMS_ID);
- Scanner sc = new Scanner(inputStream);
+ Scanner sc = new Scanner(inputStream);
- while( sc.hasNextLine() ) {
+ while ( sc.hasNextLine() ) {
- persistQueue.offer(new StreamsDatum(sc.nextLine()));
+ persistQueue.offer(new StreamsDatum(sc.nextLine()));
- }
+ }
- LOGGER.info("Providing {} docs", persistQueue.size());
+ LOGGER.info("Providing {} docs", persistQueue.size());
- StreamsResultSet result = new StreamsResultSet(persistQueue);
+ StreamsResultSet result = new StreamsResultSet(persistQueue);
- LOGGER.info("{} Exiting", STREAMS_ID);
+ LOGGER.info("{} Exiting", STREAMS_ID);
- return result;
+ return result;
- }
+ }
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return readCurrent();
- }
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return readCurrent();
+ }
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return readCurrent();
- }
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return readCurrent();
+ }
- @Override
- public boolean isRunning() {
- return true; //Will always be running
- }
+ @Override
+ public boolean isRunning() {
+ return true; //Will always be running
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
index 6d284ba..6358071 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriter.java
@@ -18,12 +18,14 @@
package org.apache.streams.console;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,53 +33,56 @@ import java.io.PrintStream;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+/**
+ * ConsolePersistWriter writes documents to stdout.
+ */
public class ConsolePersistWriter implements StreamsPersistWriter {
- private final static String STREAMS_ID = "ConsolePersistWriter";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
+ private static final String STREAMS_ID = "ConsolePersistWriter";
- protected PrintStream printStream = System.out;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriter.class);
- protected volatile Queue<StreamsDatum> persistQueue;
+ protected PrintStream printStream = System.out;
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ protected volatile Queue<StreamsDatum> persistQueue;
- public ConsolePersistWriter() {
- this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
- }
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- public ConsolePersistWriter(PrintStream printStream) {
- this();
- this.printStream = printStream;
- }
+ public ConsolePersistWriter() {
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public ConsolePersistWriter(PrintStream printStream) {
+ this();
+ this.printStream = printStream;
+ }
- public void prepare(Object o) {
- Preconditions.checkNotNull(persistQueue);
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- public void cleanUp() {
+ public void prepare(Object configuration) {
+ Preconditions.checkNotNull(persistQueue);
+ }
- }
+ public void cleanUp() {
- @Override
- public void write(StreamsDatum entry) {
+ }
- try {
+ @Override
+ public void write(StreamsDatum entry) {
- String text = mapper.writeValueAsString(entry);
+ try {
- printStream.println(text);
+ String text = mapper.writeValueAsString(entry);
- } catch (JsonProcessingException e) {
- LOGGER.warn("save: {}", e);
- }
+ printStream.println(text);
+ } catch (JsonProcessingException ex) {
+ LOGGER.warn("save: {}", ex);
}
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
index e5009f0..ecb60e3 100644
--- a/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
+++ b/streams-contrib/streams-persist-console/src/main/java/org/apache/streams/console/ConsolePersistWriterTask.java
@@ -19,36 +19,43 @@
package org.apache.streams.console;
import org.apache.streams.core.StreamsDatum;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
+/**
+ * ConsolePersistWriterTask writes documents to stdout on behalf of
+ * @see org.apache.streams.console.ConsolePersistWriter
+ */
public class ConsolePersistWriterTask implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriterTask.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsolePersistWriterTask.class);
- private ConsolePersistWriter writer;
+ private ConsolePersistWriter writer;
- public ConsolePersistWriterTask(ConsolePersistWriter writer) {
- this.writer = writer;
- }
+ public ConsolePersistWriterTask(ConsolePersistWriter writer) {
+ this.writer = writer;
+ }
- @Override
- public void run() {
- while(true) {
- if( writer.persistQueue.peek() != null ) {
- try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
+ @Override
+ public void run() {
+ while (true) {
+ if ( writer.persistQueue.peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception ex) {
+ ex.printStackTrace();
}
+ }
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException interrupt) {
+ LOGGER.trace("Interrupted", interrupt);
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
index 0b2b782..8c7f724 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
@@ -21,21 +21,24 @@ package org.apache.streams.elasticsearch;
import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
+/**
+ * Wrapper class for a client with a known version.
+ */
public class ElasticsearchClient {
- private Client client;
- private Version version;
+ private Client client;
+ private Version version;
- public ElasticsearchClient(Client client, Version version) {
- this.client = client;
- this.version = version;
- }
+ public ElasticsearchClient(Client client, Version version) {
+ this.client = client;
+ this.version = version;
+ }
- public Client getClient() {
- return client;
- }
+ public Client getClient() {
+ return client;
+ }
- public Version getVersion() {
- return version;
- }
+ public Version getVersion() {
+ return version;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
index 4809334..bdff9aa 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
@@ -19,6 +19,7 @@
package org.apache.streams.elasticsearch;
import com.google.common.net.InetAddresses;
+
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.lang.builder.ToStringBuilder;
@@ -41,157 +42,154 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
+/**
+ * Wrapper class for multiple
+ * @see org.apache.streams.elasticsearch.ElasticsearchClient
+ */
public class ElasticsearchClientManager {
- private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class);
- private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>();
-
- private ElasticsearchConfiguration elasticsearchConfiguration;
-
- public ElasticsearchClientManager(ElasticsearchConfiguration elasticsearchConfiguration) {
- this.elasticsearchConfiguration = elasticsearchConfiguration;
- }
- public ElasticsearchConfiguration getElasticsearchConfiguration() {
- return elasticsearchConfiguration;
+ private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchClientManager.class);
+ private static Map<String, ElasticsearchClient> ALL_CLIENTS = new HashMap<>();
+
+ private ElasticsearchConfiguration elasticsearchConfiguration;
+
+ public ElasticsearchClientManager(ElasticsearchConfiguration elasticsearchConfiguration) {
+ this.elasticsearchConfiguration = elasticsearchConfiguration;
+ }
+
+ public ElasticsearchConfiguration getElasticsearchConfiguration() {
+ return elasticsearchConfiguration;
+ }
+
+ /**
+ * Get the Client for this return, it is actually a transport client, but it is much
+ * easier to work with the generic object as this interface likely won't change from
+ * elasticsearch. This method is synchronized to block threads from creating
+ * too many of these at any given time.
+ *
+ * @return Client for elasticsearch
+ */
+ public Client getClient() {
+ checkAndLoadClient(null);
+
+ return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+ }
+
+ /**
+ * Returns Client with clusterName.
+ * @param clusterName clusterName
+ */
+ public Client getClient(String clusterName) {
+ checkAndLoadClient(clusterName);
+
+ return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+ }
+
+ public boolean isOnOrAfterVersion(Version version) {
+ return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
+ }
+
+ public boolean refresh(String index) {
+ return refresh(new String[]{index});
+ }
+
+ public boolean refresh(String[] indexes) {
+ RefreshResponse refreshResponse = this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+ return refreshResponse.getFailedShards() == 0;
+ }
+
+ /**
+ * Terminate the elasticsearch clients.
+ */
+ public synchronized void stop() {
+ // Check to see if we have a client.
+ if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
+ // Close the client
+ ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
+
+ // Remove it so that it isn't in memory any more.
+ ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
}
+ }
- /**
- * ***********************************************************************************
- * Get the Client for this return, it is actually a transport client, but it is much
- * easier to work with the generic object as this interface likely won't change from
- * elasticsearch. This method is synchronized to block threads from creating
- * too many of these at any given time.
- *
- * @return Client for elasticsearch
- * ***********************************************************************************
- */
- public Client getClient() {
- checkAndLoadClient(null);
-
- return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
- }
+ public ClusterHealthResponse getStatus() throws ExecutionException, InterruptedException {
+ ClusterHealthRequestBuilder request = this.getClient().admin().cluster().prepareHealth();
+ return request.execute().get();
+ }
- public Client getClient(String clusterName) {
- checkAndLoadClient(clusterName);
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
- return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
- }
+ public boolean equals(Object configuration) {
+ return EqualsBuilder.reflectionEquals(this, configuration, Collections.singletonList(this.elasticsearchConfiguration.toString()));
+ }
- public boolean isOnOrAfterVersion(Version version) {
- return ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getVersion().onOrAfter(version);
- }
+ public int hashCode() {
+ return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString()));
+ }
- public void start() throws Exception {
- /*
- * Note:
- * Everything in these classes is being switched to lazy loading. Within
- * Heroku you only have 60 seconds to connect, and bind to the service,
- * and you are only allowed to run in 1Gb of memory. Switching all
- * of this to lazy loading is how we are fixing some of the issues
- * if you are having issues with these classes, please, refactor
- * and create a UNIT TEST CASE!!!!!! To ensure that everything is
- * working before you check it back in.
- *
- * Author: Smashew @ 2013-08-26
- **********************************************************************/
- }
+ private synchronized void checkAndLoadClient(String clusterName) {
- public boolean refresh(String index) {
- return refresh(new String[]{index});
+ if (clusterName == null) {
+ clusterName = this.elasticsearchConfiguration.getClusterName();
}
- public boolean refresh(String[] indexes) {
- RefreshResponse refreshResponse = this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
- return refreshResponse.getFailedShards() == 0;
+ // If it is there, exit early
+ if (ALL_CLIENTS.containsKey(clusterName)) {
+ return;
}
- public synchronized void stop() {
- // Terminate the elasticsearch cluster
- // Check to see if we have a client.
- if (ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.getClusterName())) {
- // Close the client
- ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient().close();
-
- // Remove it so that it isn't in memory any more.
- ALL_CLIENTS.remove(this.elasticsearchConfiguration.getClusterName());
+ try {
+ // We are currently using lazy loading to start the elasticsearch cluster, however.
+ LOGGER.info("Creating a new TransportClient: {}", this.elasticsearchConfiguration.getHosts());
+
+ Settings settings = Settings.settingsBuilder()
+ .put("cluster.name", this.elasticsearchConfiguration.getClusterName())
+ .put("client.transport.ping_timeout", "90s")
+ .put("client.transport.nodes_sampler_interval", "60s")
+ .build();
+
+
+ // Create the client
+ TransportClient transportClient = TransportClient.builder().settings(settings).build();
+ for (String h : elasticsearchConfiguration.getHosts()) {
+ LOGGER.info("Adding Host: {}", h);
+ InetAddress address;
+
+ if ( InetAddresses.isInetAddress(h)) {
+ LOGGER.info("{} is an IP address", h);
+ address = InetAddresses.forString(h);
+ } else {
+ LOGGER.info("{} is a hostname", h);
+ address = InetAddress.getByName(h);
}
- }
+ transportClient.addTransportAddress(
+ new InetSocketTransportAddress(
+ address,
+ elasticsearchConfiguration.getPort().intValue()));
+ }
- public ClusterHealthResponse getStatus() throws ExecutionException, InterruptedException {
- ClusterHealthRequestBuilder request = this.getClient().admin().cluster().prepareHealth();
- return request.execute().get();
- }
+ // Add the client and figure out the version.
+ ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transportClient, getVersion(transportClient));
- public String toString() {
- return ToStringBuilder.reflectionToString(this);
- }
+ // Add it to our static map
+ ALL_CLIENTS.put(clusterName, elasticsearchClient);
- public boolean equals(Object o) {
- return EqualsBuilder.reflectionEquals(this, o, Collections.singletonList(this.elasticsearchConfiguration.toString()));
+ } catch (Exception ex) {
+ LOGGER.error("Could not Create elasticsearch Transport Client: {}", ex);
}
- public int hashCode() {
- return HashCodeBuilder.reflectionHashCode(this, Collections.singletonList(this.elasticsearchConfiguration.toString()));
- }
+ }
- private synchronized void checkAndLoadClient(String clusterName) {
-
- if (clusterName == null)
- clusterName = this.elasticsearchConfiguration.getClusterName();
-
- // If it is there, exit early
- if (ALL_CLIENTS.containsKey(clusterName))
- return;
-
- try {
- // We are currently using lazy loading to start the elasticsearch cluster, however.
- LOGGER.info("Creating a new TransportClient: {}", this.elasticsearchConfiguration.getHosts());
-
- Settings settings = Settings.settingsBuilder()
- .put("cluster.name", this.elasticsearchConfiguration.getClusterName())
- .put("client.transport.ping_timeout", "90s")
- .put("client.transport.nodes_sampler_interval", "60s")
- .build();
-
-
- // Create the client
- TransportClient transportClient = TransportClient.builder().settings(settings).build();
- for (String h : elasticsearchConfiguration.getHosts()) {
- LOGGER.info("Adding Host: {}", h);
- InetAddress address;
-
- if( InetAddresses.isInetAddress(h)) {
- LOGGER.info("{} is an IP address", h);
- address = InetAddresses.forString(h);
- } else {
- LOGGER.info("{} is a hostname", h);
- address = InetAddress.getByName(h);
- }
- transportClient.addTransportAddress(
- new InetSocketTransportAddress(
- address,
- elasticsearchConfiguration.getPort().intValue()));
- }
- // Add the client and figure out the version.
- ElasticsearchClient elasticsearchClient = new ElasticsearchClient(transportClient, getVersion(transportClient));
-
- // Add it to our static map
- ALL_CLIENTS.put(clusterName, elasticsearchClient);
-
- } catch (Exception e) {
- LOGGER.error("Could not Create elasticsearch Transport Client: {}", e);
- }
+ private Version getVersion(Client client) {
+ try {
+ ClusterStateRequestBuilder clusterStateRequestBuilder = client.admin().cluster().prepareState();
+ ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
+ return clusterStateResponse.getState().getNodes().getMasterNode().getVersion();
+ } catch (Exception ex) {
+ return null;
}
-
- private Version getVersion(Client client) {
- try {
- ClusterStateRequestBuilder clusterStateRequestBuilder = client.admin().cluster().prepareState();
- ClusterStateResponse clusterStateResponse = clusterStateRequestBuilder.execute().actionGet();
-
- return clusterStateResponse.getState().getNodes().getMasterNode().getVersion();
- } catch (Exception e) {
- return null;
- }
- }
+ }
}