You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:15 UTC
[34/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 492eccb..4554c0f 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -18,13 +18,6 @@
package org.apache.streams.hdfs;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteUtil;
@@ -34,6 +27,15 @@ import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,249 +52,278 @@ import java.util.List;
import java.util.Queue;
import java.util.zip.GZIPOutputStream;
+/**
+ * WebHdfsPersistWriter writes to hdfs.
+ */
public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
- public final static String STREAMS_ID = "WebHdfsPersistWriter";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
-
- private FileSystem client;
- private Path path;
- private int linesPerFile;
- private int totalRecordsWritten = 0;
- private final List<Path> writtenFiles = new ArrayList<Path>();
- private int fileLineCounter = 0;
- private OutputStreamWriter currentWriter = null;
-
- private static final int BYTES_IN_MB = 1024 * 1024;
- private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
-
- public boolean terminate = false;
-
- protected volatile Queue<StreamsDatum> persistQueue;
-
- private ObjectMapper mapper;
- private LineReadWriteUtil lineWriterUtil;
-
- protected HdfsWriterConfiguration hdfsConfiguration;
-
- public WebHdfsPersistWriter() {
- this(new ComponentConfigurator<>(HdfsWriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
- }
-
- public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
- this.hdfsConfiguration = hdfsConfiguration;
- this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue();
+ public static final String STREAMS_ID = "WebHdfsPersistWriter";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
+
+ private FileSystem client;
+ private Path path;
+ private int linesPerFile;
+ private int totalRecordsWritten = 0;
+ private final List<Path> writtenFiles = new ArrayList<Path>();
+ private int fileLineCounter = 0;
+ private OutputStreamWriter currentWriter = null;
+
+ private static final int BYTES_IN_MB = 1024 * 1024;
+ private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
+ private volatile int totalByteCount = 0;
+ private volatile int byteCount = 0;
+
+ public boolean terminate = false;
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper;
+ private LineReadWriteUtil lineWriterUtil;
+
+ protected HdfsWriterConfiguration hdfsConfiguration;
+
+ public WebHdfsPersistWriter() {
+ this(new ComponentConfigurator<>(HdfsWriterConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
+ }
+
+ public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
+ this.hdfsConfiguration = hdfsConfiguration;
+ this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue();
+ }
+
+ /**
+ * getURI from hdfsConfiguration.
+ * @return URI
+ * @throws URISyntaxException URISyntaxException
+ */
+ // TODO: combine with WebHdfsPersistReader.getURI
+ public URI getURI() throws URISyntaxException {
+ StringBuilder uriBuilder = new StringBuilder();
+ uriBuilder.append(hdfsConfiguration.getScheme());
+ uriBuilder.append("://");
+ if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) {
+ uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+ } else {
+ uriBuilder.append("/");
}
-
- public URI getURI() throws URISyntaxException {
- StringBuilder uriBuilder = new StringBuilder();
- uriBuilder.append(hdfsConfiguration.getScheme());
- uriBuilder.append("://");
- if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost()))
- uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
- else
- uriBuilder.append("/");
- return new URI(uriBuilder.toString());
+ return new URI(uriBuilder.toString());
+ }
+
+ /**
+ * isConnected.
+ * @return true if connected, false otherwise
+ */
+ // TODO: combine with WebHdfsPersistReader.isConnected
+ public boolean isConnected() {
+ return (client != null);
+ }
+
+ /**
+ * getFileSystem.
+ * @return FileSystem
+ */
+ // TODO: combine with WebHdfsPersistReader.getFileSystem
+ public final synchronized FileSystem getFileSystem() {
+ // Check to see if we are connected.
+ if (!isConnected()) {
+ connectToWebHDFS();
}
-
- public boolean isConnected() {
- return (client != null);
- }
-
- public final synchronized FileSystem getFileSystem() {
- // Check to see if we are connected.
- if (!isConnected())
- connectToWebHDFS();
- return this.client;
- }
-
- private synchronized void connectToWebHDFS() {
- try {
- LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
- ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
-
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws Exception {
- Configuration conf = new Configuration();
- conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
- LOGGER.info("WebURI : {}", getURI().toString());
- client = FileSystem.get(getURI(), conf);
- LOGGER.info("Connected to WebHDFS");
-
- /*
- * ************************************************************************************************
- * This code is an example of how you would work with HDFS and you weren't going over
- * the webHDFS protocol.
- *
- * Smashew: 2013-10-01
- * ************************************************************************************************
- conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName);
- conf.set("namenode.host","0.0.0.0");
- conf.set("hadoop.job.ugi", userName);
- conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
- fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
- FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
- for(int i=0;i<status.length;i++)
- {
- LOGGER.info("Directory: {}", status[i].getPath());
- }
- */
- return null;
- }
- });
- } catch (Exception e) {
- LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", e);
- throw new RuntimeException(e);
+ return this.client;
+ }
+
+ private synchronized void connectToWebHDFS() {
+ try {
+ LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
+ ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
+
+ ugi.doAs(new PrivilegedExceptionAction<Void>() {
+ public Void run() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ LOGGER.info("WebURI : {}", getURI().toString());
+ client = FileSystem.get(getURI(), conf);
+ LOGGER.info("Connected to WebHDFS");
+
+ /*
+ * ************************************************************************************************
+ * This code is an example of how you would work with HDFS and you weren't going over
+ * the webHDFS protocol.
+ *
+ * Smashew: 2013-10-01
+ * ************************************************************************************************
+ conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" + userName);
+ conf.set("namenode.host","0.0.0.0");
+ conf.set("hadoop.job.ugi", userName);
+ conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
+ fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
+ FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
+ for(int i=0;i<status.length;i++)
+ {
+ LOGGER.info("Directory: {}", status[i].getPath());
+ }
+ */
+
+ return null;
}
+ });
+ } catch (Exception ex) {
+ LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", ex);
+ throw new RuntimeException(ex);
}
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public void write(StreamsDatum streamsDatum) {
-
- synchronized (this) {
- // Check to see if we need to reset the file that we are currently working with
- if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
- resetFile();
-
- String line = lineWriterUtil.convertResultToString(streamsDatum);
- writeInternal(line);
- if( !line.endsWith(this.hdfsConfiguration.getLineDelimiter()))
- writeInternal(this.hdfsConfiguration.getLineDelimiter());
- int bytesInLine = line.getBytes().length;
-
- totalRecordsWritten++;
- totalByteCount += bytesInLine;
- byteCount += bytesInLine;
-
- if (byteCount > BYTES_BEFORE_FLUSH)
- try {
- flush();
- } catch (IOException e) {
- LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution. WARNING: There could be data loss.", e);
- }
-
- this.fileLineCounter++;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+
+ synchronized (this) {
+ // Check to see if we need to reset the file that we are currently working with
+ if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile)) {
+ resetFile();
+ }
+ String line = lineWriterUtil.convertResultToString(streamsDatum);
+ writeInternal(line);
+ if ( !line.endsWith(this.hdfsConfiguration.getLineDelimiter())) {
+ writeInternal(this.hdfsConfiguration.getLineDelimiter());
+ }
+ int bytesInLine = line.getBytes().length;
+
+ totalRecordsWritten++;
+ totalByteCount += bytesInLine;
+ byteCount += bytesInLine;
+
+ if (byteCount > BYTES_BEFORE_FLUSH) {
+ try {
+ flush();
+ } catch (IOException ex) {
+ LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution. WARNING: There could be data loss.", ex);
}
+ }
+ this.fileLineCounter++;
}
-
- private void writeInternal(String line) {
+ }
+
+ private void writeInternal(String line) {
+ try {
+ this.currentWriter.write(line);
+ } catch (IOException ex) {
+ LOGGER.warn("Error writing to HDFS. Attempting to try a new file", ex);
+ try {
+ resetFile();
+ this.currentWriter.write(line);
+ } catch (Exception e2) {
+ LOGGER.warn("Failed to write even after creating a new file. Attempting to reconnect", e2);
try {
- this.currentWriter.write(line);
- } catch (IOException e) {
- LOGGER.warn("Error writing to HDFS. Attempting to try a new file", e);
- try{
- resetFile();
- this.currentWriter.write(line);
- } catch (Exception io) {
- LOGGER.warn("Failed to write even after creating a new file. Attempting to reconnect", io);
- try {
- connectToWebHDFS();
- resetFile();
- this.currentWriter.write(line);
- } catch (Exception ex) {
- LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", ex);
- throw new RuntimeException(e);
- }
- }
-
+ connectToWebHDFS();
+ resetFile();
+ this.currentWriter.write(line);
+ } catch (Exception e3) {
+ LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", e3);
+ throw new RuntimeException(e3);
}
- }
+ }
- public void flush() throws IOException {
- if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
- this.currentWriter.flush();
- byteCount = 0;
- }
}
+ }
- private synchronized void resetFile() {
- // this will keep it thread safe, so we don't create too many files
- if (this.fileLineCounter == 0 && this.currentWriter != null)
- return;
+ @Override
+ public void flush() throws IOException {
+ if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
+ this.currentWriter.flush();
+ byteCount = 0;
+ }
+ }
- // Create the path for where the file is going to live.
- Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime());
+ private synchronized void resetFile() {
+ // this will keep it thread safe, so we don't create too many files
+ if (this.fileLineCounter == 0 && this.currentWriter != null) {
+ return;
+ }
- if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP))
- filePath = filePath.suffix(".gz");
- else
- filePath = filePath.suffix(".tsv");
+ // Create the path for where the file is going to live.
+ Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime());
- try {
+ if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) {
+ filePath = filePath.suffix(".gz");
+ } else {
+ filePath = filePath.suffix(".tsv");
+ }
- // if there is a current writer, we must close it first.
- if (this.currentWriter != null) {
- flush();
- close();
- }
+ try {
- this.fileLineCounter = 0;
+ // if there is a current writer, we must close it first.
+ if (this.currentWriter != null) {
+ flush();
+ close();
+ }
- // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
- if (client.exists(filePath))
- throw new RuntimeException("Unable to create file: " + filePath);
+ this.fileLineCounter = 0;
- if( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP))
- this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath)));
- else
- this.currentWriter = new OutputStreamWriter(client.create(filePath));
+ // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
+ if (client.exists(filePath)) {
+ throw new RuntimeException("Unable to create file: " + filePath);
+ }
- // Add another file to the list of written files.
- writtenFiles.add(filePath);
+ if ( hdfsConfiguration.getCompression().equals(HdfsWriterConfiguration.Compression.GZIP)) {
+ this.currentWriter = new OutputStreamWriter(new GZIPOutputStream(client.create(filePath)));
+ } else {
+ this.currentWriter = new OutputStreamWriter(client.create(filePath));
+ }
- LOGGER.info("File Created: {}", filePath);
- } catch (Exception e) {
- LOGGER.error("COULD NOT CreateFile: {}", filePath);
- LOGGER.error(e.getMessage());
- throw new RuntimeException(e);
- }
- }
+ // Add another file to the list of written files.
+ writtenFiles.add(filePath);
- public synchronized void close() throws IOException {
- if (this.currentWriter != null) {
- this.currentWriter.flush();
- this.currentWriter.close();
- this.currentWriter = null;
- LOGGER.info("File Closed");
- }
+ LOGGER.info("File Created: {}", filePath);
+ } catch (Exception ex) {
+ LOGGER.error("COULD NOT CreateFile: {}", filePath);
+ LOGGER.error(ex.getMessage());
+ throw new RuntimeException(ex);
}
-
- @Override
- public void prepare(Object configurationObject) {
- mapper = StreamsJacksonMapper.getInstance();
- lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
- connectToWebHDFS();
- path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (this.currentWriter != null) {
+ this.currentWriter.flush();
+ this.currentWriter.close();
+ this.currentWriter = null;
+ LOGGER.info("File Closed");
}
-
- @Override
- public void cleanUp() {
- try {
- flush();
- } catch (IOException e) {
- LOGGER.error("Error flushing on cleanup", e);
- }
- try {
- close();
- } catch (IOException e) {
- LOGGER.error("Error closing on cleanup", e);
- }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ mapper = StreamsJacksonMapper.getInstance();
+ lineWriterUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
+ connectToWebHDFS();
+ path = new Path(hdfsConfiguration.getPath() + "/" + hdfsConfiguration.getWriterPath());
+ }
+
+ @Override
+ public void cleanUp() {
+ try {
+ flush();
+ } catch (IOException ex) {
+ LOGGER.error("Error flushing on cleanup", ex);
}
-
- @Override
- public DatumStatusCounter getDatumStatusCounter() {
- DatumStatusCounter counters = new DatumStatusCounter();
- counters.incrementAttempt(this.totalRecordsWritten);
- counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
- return counters;
+ try {
+ close();
+ } catch (IOException ex) {
+ LOGGER.error("Error closing on cleanup", ex);
}
+ }
+
+ @Override
+ public DatumStatusCounter getDatumStatusCounter() {
+ DatumStatusCounter counters = new DatumStatusCounter();
+ counters.incrementAttempt(this.totalRecordsWritten);
+ counters.incrementStatus(DatumStatus.SUCCESS, this.totalRecordsWritten);
+ return counters;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
index 00cf17f..eb808c1 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriterTask.java
@@ -19,38 +19,43 @@
package org.apache.streams.hdfs;
import org.apache.streams.core.StreamsDatum;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
+/**
+ * WebHdfsPersistReaderTask writes to hdfs on behalf of
+ * @see org.apache.streams.hdfs.WebHdfsPersistWriter
+ */
public class WebHdfsPersistWriterTask implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriterTask.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriterTask.class);
- private WebHdfsPersistWriter writer;
+ private WebHdfsPersistWriter writer;
- public WebHdfsPersistWriterTask(WebHdfsPersistWriter writer) {
- this.writer = writer;
- }
+ public WebHdfsPersistWriterTask(WebHdfsPersistWriter writer) {
+ this.writer = writer;
+ }
- @Override
- public void run() {
-
- while(true) {
- if( writer.persistQueue.peek() != null ) {
- try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(1));
- } catch (InterruptedException e) {}
- }
+ @Override
+ public void run() {
+ while (true) {
+ if ( writer.persistQueue.peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException e) {}
}
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
index 819414a..a35f124 100644
--- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
@@ -33,7 +33,7 @@ import static org.junit.Assert.*;
*/
public class HdfsPersistConfigTest {
- private final static Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class);
@Test
public void getWriterFileUriTest()
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
index ff33ec3..7191d9a 100644
--- a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/TestHdfsPersist.java
@@ -46,7 +46,7 @@ import java.util.List;
*/
public class TestHdfsPersist {
- private final static Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(TestHdfsPersist.class);
ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index d54e794..64f7200 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -18,19 +18,14 @@
package org.apache.streams.kafka;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.consumer.Consumer;
-import kafka.consumer.ConsumerConfig;
-import kafka.consumer.KafkaStream;
-import kafka.consumer.Whitelist;
-import kafka.javaapi.consumer.ConsumerConnector;
-import kafka.serializer.StringDecoder;
-import kafka.utils.VerifiableProperties;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,113 +40,132 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-public class KafkaPersistReader implements StreamsPersistReader, Serializable {
-
- public final static String STREAMS_ID = "KafkaPersistReader";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class);
-
- protected volatile Queue<StreamsDatum> persistQueue;
-
- private ObjectMapper mapper = new ObjectMapper();
-
- private KafkaConfiguration config;
-
- private ConsumerConnector consumerConnector;
-
- public List<KafkaStream<String, String>> inStreams;
-
- private ExecutorService executor = Executors.newSingleThreadExecutor();
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.KafkaStream;
+import kafka.consumer.Whitelist;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.serializer.StringDecoder;
+import kafka.utils.VerifiableProperties;
- public KafkaPersistReader() {
- this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
- this.persistQueue = new ConcurrentLinkedQueue<>();
- }
+/**
+ * KafkaPersistReader reads documents from kafka.
+ */
+public class KafkaPersistReader implements StreamsPersistReader, Serializable {
- public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
- this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
- this.persistQueue = persistQueue;
- }
+ public static final String STREAMS_ID = "KafkaPersistReader";
- public void setConfig(KafkaConfiguration config) {
- this.config = config;
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReader.class);
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ protected volatile Queue<StreamsDatum> persistQueue;
- @Override
- public void startStream() {
+ private ObjectMapper mapper = new ObjectMapper();
- Properties props = new Properties();
- props.setProperty("serializer.encoding", "UTF8");
+ private KafkaConfiguration config;
- ConsumerConfig consumerConfig = new ConsumerConfig(props);
+ private ConsumerConnector consumerConnector;
- consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
+ public List<KafkaStream<String, String>> inStreams;
- Whitelist topics = new Whitelist(config.getTopic());
- VerifiableProperties vprops = new VerifiableProperties(props);
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
- inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
+ /**
+ * KafkaPersistReader constructor - resolves KafkaConfiguration from JVM 'kafka'.
+ */
+ public KafkaPersistReader() {
+ this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+ this.persistQueue = new ConcurrentLinkedQueue<>();
+ }
- for (final KafkaStream stream : inStreams) {
- executor.submit(new KafkaPersistReaderTask(this, stream));
- }
+ /**
+ * KafkaPersistReader constructor - uses supplied persistQueue.
+ */
+ public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
+ this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+ this.persistQueue = persistQueue;
+ }
- }
+ public void setConfig(KafkaConfiguration config) {
+ this.config = config;
+ }
- @Override
- public StreamsResultSet readAll() {
- return readCurrent();
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public StreamsResultSet readCurrent() {
- return null;
- }
+ @Override
+ public void startStream() {
- @Override
- public StreamsResultSet readNew(BigInteger bigInteger) {
- return null;
- }
+ Properties props = new Properties();
+ props.setProperty("serializer.encoding", "UTF8");
- @Override
- public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
- return null;
- }
+ ConsumerConfig consumerConfig = new ConsumerConfig(props);
- @Override
- public boolean isRunning() {
- return !executor.isShutdown() && !executor.isTerminated();
- }
+ consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
- private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
- Properties props = new Properties();
- props.put("zookeeper.connect", a_zookeeper);
- props.put("group.id", a_groupId);
- props.put("zookeeper.session.timeout.ms", "400");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- return new ConsumerConfig(props);
- }
+ Whitelist topics = new Whitelist(config.getTopic());
+ VerifiableProperties vprops = new VerifiableProperties(props);
- @Override
- public void prepare(Object configurationObject) {
+ inStreams = consumerConnector.createMessageStreamsByFilter(topics, 1, new StringDecoder(vprops), new StringDecoder(vprops));
+ for (final KafkaStream stream : inStreams) {
+ executor.submit(new KafkaPersistReaderTask(this, stream));
}
- @Override
- public void cleanUp() {
- consumerConnector.shutdown();
- while( !executor.isTerminated()) {
- try {
- executor.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException ignored) {}
- }
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ return readCurrent();
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger bigInteger) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
+ private static ConsumerConfig createConsumerConfig(String zookeeper, String groupId) {
+ Properties props = new Properties();
+ props.put("zookeeper.connect", zookeeper);
+ props.put("group.id", groupId);
+ props.put("zookeeper.session.timeout.ms", "400");
+ props.put("zookeeper.sync.time.ms", "200");
+ props.put("auto.commit.interval.ms", "1000");
+ return new ConsumerConfig(props);
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+ consumerConnector.shutdown();
+ while ( !executor.isTerminated()) {
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException interrupt) {
+ LOGGER.trace("Interrupt", interrupt);
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
index 83493e0..199be73 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReaderTask.java
@@ -18,45 +18,51 @@
package org.apache.streams.kafka;
-import kafka.consumer.ConsumerIterator;
-import kafka.consumer.KafkaStream;
-import kafka.message.MessageAndMetadata;
import org.apache.streams.core.StreamsDatum;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
-public class KafkaPersistReaderTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
-
- private KafkaPersistReader reader;
- private KafkaStream<String,String> stream;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.message.MessageAndMetadata;
- public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) {
- this.reader = reader;
- this.stream = stream;
- }
+/**
+ * KafkaPersistReaderTask reads documents from kafka on behalf of
+ * @see org.apache.streams.kafka.KafkaPersistReader
+ */
+public class KafkaPersistReaderTask implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistReaderTask.class);
+ private KafkaPersistReader reader;
+ private KafkaStream<String,String> stream;
- @Override
- public void run() {
+ public KafkaPersistReaderTask(KafkaPersistReader reader, KafkaStream<String,String> stream) {
+ this.reader = reader;
+ this.stream = stream;
+ }
- MessageAndMetadata<String,String> item;
- while(true) {
+ @Override
+ public void run() {
- ConsumerIterator<String, String> it = stream.iterator();
- while (it.hasNext()) {
- item = it.next();
- reader.persistQueue.add(new StreamsDatum(item.message()));
- }
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
- }
+ MessageAndMetadata<String,String> item;
+ while (true) {
+ ConsumerIterator<String, String> it = stream.iterator();
+ while (it.hasNext()) {
+ item = it.next();
+ reader.persistQueue.add(new StreamsDatum(item.message()));
+ }
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException interrupt) {
+ LOGGER.trace("Interrupt", interrupt);
+ }
}
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 83032e6..40e125f 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -18,16 +18,15 @@
package org.apache.streams.kafka;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.util.GuidUtils;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,99 +35,114 @@ import java.util.Properties;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
-
- public final static String STREAMS_ID = "KafkaPersistWriter";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
- protected volatile Queue<StreamsDatum> persistQueue;
+/**
+ * KafkaPersistWriter writes documents to kafka.
+ */
+public class KafkaPersistWriter implements StreamsPersistWriter, Serializable, Runnable {
- private ObjectMapper mapper = new ObjectMapper();
+ public static final String STREAMS_ID = "KafkaPersistWriter";
- private KafkaConfiguration config;
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriter.class);
- private Producer<String, String> producer;
+ protected volatile Queue<StreamsDatum> persistQueue;
- public KafkaPersistWriter() {
- this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
- this.persistQueue = new ConcurrentLinkedQueue<>();
- }
+ private ObjectMapper mapper = new ObjectMapper();
- public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
- this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
- this.persistQueue = persistQueue;
- }
+ private KafkaConfiguration config;
- public void setConfig(KafkaConfiguration config) {
- this.config = config;
- }
+ private Producer<String, String> producer;
- public void start() {
- Properties props = new Properties();
+ /**
+ * KafkaPersistWriter constructor - resolves KafkaConfiguration from JVM 'kafka'.
+ */
+ public KafkaPersistWriter() {
+ this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+ this.persistQueue = new ConcurrentLinkedQueue<>();
+ }
- props.put("metadata.broker.list", config.getBrokerlist());
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
- props.put("request.required.acks", "1");
+ /**
+ * KafkaPersistWriter constructor - uses supplied persistQueue.
+ */
+ public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
+ this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+ this.persistQueue = persistQueue;
+ }
- ProducerConfig config = new ProducerConfig(props);
+ public void setConfig(KafkaConfiguration config) {
+ this.config = config;
+ }
- producer = new Producer<>(config);
+ /**
+ * run persist writer thread
+ */
+ public void start() {
+ Properties props = new Properties();
- new Thread(new KafkaPersistWriterTask(this)).start();
- }
+ props.put("metadata.broker.list", config.getBrokerlist());
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ props.put("partitioner.class", "org.apache.streams.kafka.StreamsPartitioner");
+ props.put("request.required.acks", "1");
- public void stop() {
- producer.close();
- }
+ ProducerConfig config = new ProducerConfig(props);
- public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
- }
+ producer = new Producer<>(config);
- public Queue<StreamsDatum> getPersistQueue() {
- return this.persistQueue;
- }
+ new Thread(new KafkaPersistWriterTask(this)).start();
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public void stop() {
+ producer.close();
+ }
- @Override
- public void write(StreamsDatum entry) {
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
- try {
- String text = mapper.writeValueAsString(entry);
+ public Queue<StreamsDatum> getPersistQueue() {
+ return this.persistQueue;
+ }
- String hash = GuidUtils.generateGuid(text);
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), hash, text);
+ @Override
+ public void write(StreamsDatum entry) {
- producer.send(data);
+ try {
- } catch (JsonProcessingException e) {
- LOGGER.warn("save: {}", e);
- }// put
- }
+ String text = mapper.writeValueAsString(entry);
- @Override
- public void run() {
- start();
+ String hash = GuidUtils.generateGuid(text);
- // stop();
- }
+ KeyedMessage<String, String> data = new KeyedMessage<>(config.getTopic(), hash, text);
- @Override
- public void prepare(Object configurationObject) {
- start();
- }
+ producer.send(data);
- @Override
- public void cleanUp() {
- stop();
+ } catch (JsonProcessingException ex) {
+ LOGGER.warn("save: {}", ex);
}
+ }
+
+ @Override
+ public void run() {
+ start();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
index 5d8ee9e..dae7aa2 100644
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
+++ b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriterTask.java
@@ -19,38 +19,45 @@
package org.apache.streams.kafka;
import org.apache.streams.core.StreamsDatum;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
+/**
+ * KafkaPersistWriterTask writes documents to kafka on behalf of
+ * @see org.apache.streams.kafka.KafkaPersistWriter
+ */
public class KafkaPersistWriterTask implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPersistWriterTask.class);
- private KafkaPersistWriter writer;
+ private KafkaPersistWriter writer;
- public KafkaPersistWriterTask(KafkaPersistWriter writer) {
- this.writer = writer;
- }
+ public KafkaPersistWriterTask(KafkaPersistWriter writer) {
+ this.writer = writer;
+ }
- @Override
- public void run() {
-
- while(true) {
- if( writer.getPersistQueue().peek() != null ) {
- try {
- StreamsDatum entry = writer.persistQueue.remove();
- writer.write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) {}
- }
+ @Override
+ public void run() {
+ while (true) {
+ if ( writer.getPersistQueue().peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException interrupt) {
+ LOGGER.trace("Interrupt", interrupt);
+ }
}
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
deleted file mode 100644
index ebfff9a..0000000
--- a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/StreamsPartitioner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.kafka;
-
-/**
- * Created by sblackmon on 12/15/13.
- */
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-public class StreamsPartitioner implements Partitioner<String> {
- public StreamsPartitioner (VerifiableProperties props) {
-
- }
-
- public int partition(String key, int a_numPartitions) {
- int partition = 0;
- int offset = key.lastIndexOf('.');
- if (offset > 0) {
- partition = Integer.parseInt( key.substring(offset+1)) % a_numPartitions;
- }
- return partition;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
index ba77ff1..b6a7404 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -18,6 +18,14 @@
package org.apache.streams.mongo;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Strings;
@@ -30,13 +38,7 @@ import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,230 +54,248 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+/**
+ * MongoPersistReader reads documents from mongo.
+ */
public class MongoPersistReader implements StreamsPersistReader {
- public static final String STREAMS_ID = "MongoPersistReader";
+ public static final String STREAMS_ID = "MongoPersistReader";
- private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class);
- protected volatile Queue<StreamsDatum> persistQueue;
+ protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
- private ExecutorService executor;
+ private ExecutorService executor;
- private MongoConfiguration config;
+ private MongoConfiguration config;
- protected MongoClient client;
- protected DB db;
- protected DBCollection collection;
+ protected MongoClient client;
+ protected DB db;
+ protected DBCollection collection;
- protected DBCursor cursor;
+ protected DBCursor cursor;
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- public MongoPersistReader() {
- this.config = new ComponentConfigurator<>(MongoConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
- }
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- public MongoPersistReader(MongoConfiguration config) {
- this.config = config;
- }
-
- public MongoPersistReader(Queue<StreamsDatum> persistQueue) {
- this.config = new ComponentConfigurator<>(MongoConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
- this.persistQueue = persistQueue;
- }
+ /**
+ * KafkaPersistReader constructor - resolves KafkaConfiguration from JVM 'mongo'.
+ */
+ public MongoPersistReader() {
+ this.config = new ComponentConfigurator<>(MongoConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
+ }
- public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
- }
+ /**
+ * KafkaPersistReader constructor - uses supplied MongoConfiguration.
+ * @param config config
+ */
+ public MongoPersistReader(MongoConfiguration config) {
+ this.config = config;
+ }
- public Queue<StreamsDatum> getPersistQueue() {
- return persistQueue;
- }
+ /**
+ * KafkaPersistReader constructor - uses supplied persistQueue.
+ * @param persistQueue persistQueue
+ */
+ public MongoPersistReader(Queue<StreamsDatum> persistQueue) {
+ this.config = new ComponentConfigurator<>(MongoConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo"));
+ this.persistQueue = persistQueue;
+ }
- public void stop() {
- }
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
- @Override
- public void prepare(Object configurationObject) {
+ public void stop() {
+ }
- connectToMongo();
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- if( client == null ||
- collection == null )
- throw new RuntimeException("Unable to connect!");
+ @Override
+ public void prepare(Object configurationObject) {
- cursor = collection.find();
+ connectToMongo();
- if( cursor == null || !cursor.hasNext())
- throw new RuntimeException("Collection not present or empty!");
+ if ( client == null
+ || collection == null ) {
+ throw new RuntimeException("Unable to connect!");
+ }
+ cursor = collection.find();
- persistQueue = constructQueue();
+ if ( cursor == null
+ || !cursor.hasNext()) {
+ throw new RuntimeException("Collection not present or empty!");
+ }
- executor = Executors.newSingleThreadExecutor();
+ persistQueue = constructQueue();
- }
+ executor = Executors.newSingleThreadExecutor();
- @Override
- public void cleanUp() {
- stop();
- }
+ }
- protected StreamsDatum prepareDatum(DBObject dbObject) {
+ @Override
+ public void cleanUp() {
+ stop();
+ }
- ObjectNode objectNode;
- String id;
+ protected StreamsDatum prepareDatum(DBObject dbObject) {
- try {
- objectNode = mapper.readValue(dbObject.toString(), ObjectNode.class);
- id = objectNode.get("_id").get("$oid").asText();
- objectNode.remove("_id");
- } catch (IOException e) {
- LOGGER.warn("document isn't valid JSON.");
- return null;
- }
+ ObjectNode objectNode;
+ String id;
- return new StreamsDatum(objectNode, id);
+ try {
+ objectNode = mapper.readValue(dbObject.toString(), ObjectNode.class);
+ id = objectNode.get("_id").get("$oid").asText();
+ objectNode.remove("_id");
+ } catch (IOException ex) {
+ LOGGER.warn("document isn't valid JSON.");
+ return null;
}
- private synchronized void connectToMongo() {
+ return new StreamsDatum(objectNode, id);
+ }
- ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
+ private synchronized void connectToMongo() {
- if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
- MongoCredential credential =
- MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
- client = new MongoClient(serverAddress, Lists.newArrayList(credential));
- } else {
- client = new MongoClient(serverAddress);
- }
+ ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
- db = client.getDB(config.getDb());
+ if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
+ MongoCredential credential =
+ MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
+ client = new MongoClient(serverAddress, Lists.newArrayList(credential));
+ } else {
+ client = new MongoClient(serverAddress);
+ }
- if (!db.collectionExists(config.getCollection())) {
- db.createCollection(config.getCollection(), null);
- }
+ db = client.getDB(config.getDb());
- collection = db.getCollection(config.getCollection());
+ if (!db.collectionExists(config.getCollection())) {
+ db.createCollection(config.getCollection(), null);
}
- @Override
- public StreamsResultSet readAll() {
-
- try (DBCursor cursor = collection.find()) {
- while (cursor.hasNext()) {
- DBObject dbObject = cursor.next();
- StreamsDatum datum = prepareDatum(dbObject);
- write(datum);
- }
- }
+ collection = db.getCollection(config.getCollection());
+ }
- return readCurrent();
- }
+ @Override
+ public StreamsResultSet readAll() {
- @Override
- public void startStream() {
+ try (DBCursor cursor = collection.find()) {
+ while (cursor.hasNext()) {
+ DBObject dbObject = cursor.next();
+ StreamsDatum datum = prepareDatum(dbObject);
+ write(datum);
+ }
+ }
- LOGGER.debug("startStream");
- MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this);
- Thread readerTaskThread = new Thread(readerTask);
- Future future = executor.submit(readerTaskThread);
+ return readCurrent();
+ }
- while( !future.isDone() && !future.isCancelled()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {}
- }
+ @Override
+ public void startStream() {
- executor.shutdown();
+ LOGGER.debug("startStream");
+ MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this);
+ Thread readerTaskThread = new Thread(readerTask);
+ Future future = executor.submit(readerTaskThread);
+ while ( !future.isDone() && !future.isCancelled()) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException interrupt) {
+ LOGGER.trace("Interrupt", interrupt);
+ }
}
- @Override
- public StreamsResultSet readCurrent() {
+ executor.shutdown();
- StreamsResultSet current;
+ }
- try {
- lock.writeLock().lock();
- current = new StreamsResultSet(persistQueue);
- current.setCounter(new DatumStatusCounter());
- persistQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
+ @Override
+ public StreamsResultSet readCurrent() {
- return current;
- }
+ StreamsResultSet current;
- //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
- //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference
- //if the queue is being replaced with a new instance
- protected void write(StreamsDatum entry) {
- boolean success;
- do {
- try {
- lock.readLock().lock();
- success = persistQueue.offer(entry);
- Thread.yield();
- } finally {
- lock.readLock().unlock();
- }
- }
- while (!success);
+ try {
+ lock.writeLock().lock();
+ current = new StreamsResultSet(persistQueue);
+ current.setCounter(new DatumStatusCounter());
+ persistQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
}
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
+ return current;
+ }
+
+ //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
+ //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference
+ //if the queue is being replaced with a new instance
+ protected void write(StreamsDatum entry) {
+ boolean success;
+ do {
+ try {
+ lock.readLock().lock();
+ success = persistQueue.offer(entry);
+ Thread.yield();
+ } finally {
+ lock.readLock().unlock();
+ }
}
+ while (!success);
+ }
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
- @Override
- public boolean isRunning() {
- return !executor.isTerminated() || !executor.isShutdown();
- }
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
- private Queue<StreamsDatum> constructQueue() {
- return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
- }
+ @Override
+ public boolean isRunning() {
+ return !executor.isTerminated() || !executor.isShutdown();
+ }
- public class MongoPersistReaderTask implements Runnable {
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ }
- private MongoPersistReader reader;
+ public class MongoPersistReaderTask implements Runnable {
- public MongoPersistReaderTask(MongoPersistReader reader) {
- this.reader = reader;
- }
+ private MongoPersistReader reader;
- @Override
- public void run() {
+ public MongoPersistReaderTask(MongoPersistReader reader) {
+ this.reader = reader;
+ }
- try {
- while(reader.cursor.hasNext()) {
- DBObject dbObject = reader.cursor.next();
- StreamsDatum datum = reader.prepareDatum(dbObject);
- reader.write(datum);
- }
- } finally {
- reader.cursor.close();
- }
+ @Override
+ public void run() {
+ try {
+ while (reader.cursor.hasNext()) {
+ DBObject dbObject = reader.cursor.next();
+ StreamsDatum datum = reader.prepareDatum(dbObject);
+ reader.write(datum);
}
+ } finally {
+ reader.cursor.close();
+ }
}
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
index 5f6ac1f..6072f58 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistWriter.java
@@ -18,6 +18,12 @@
package org.apache.streams.mongo;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Strings;
@@ -29,14 +35,12 @@ import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.util.JSON;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
+import java.io.Flushable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -50,209 +54,217 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class MongoPersistWriter implements StreamsPersistWriter, Runnable {
+public class MongoPersistWriter implements StreamsPersistWriter, Runnable, Flushable, Closeable {
- public final static String STREAMS_ID = "MongoPersistWriter";
+ public static final String STREAMS_ID = "MongoPersistWriter";
- private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriter.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistWriter.class);
- private final static long MAX_WRITE_LATENCY = 1000;
+ private static final long MAX_WRITE_LATENCY = 1000;
- protected volatile Queue<StreamsDatum> persistQueue;
+ protected volatile Queue<StreamsDatum> persistQueue;
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
- private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+ private ScheduledExecutorService backgroundFlushTask = Executors.newSingleThreadScheduledExecutor();
- private MongoConfiguration config;
+ private MongoConfiguration config;
- protected MongoClient client;
- protected DB db;
- protected DBCollection collection;
+ protected MongoClient client;
+ protected DB db;
+ protected DBCollection collection;
- protected List<DBObject> insertBatch = new ArrayList<>();
+ protected List<DBObject> insertBatch = new ArrayList<>();
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- public MongoPersistWriter() {
- this(new ComponentConfigurator<>(MongoConfiguration.class)
- .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")));
- }
-
- public MongoPersistWriter(MongoConfiguration config) {
- this.config = config;
- }
+ public MongoPersistWriter() {
+ this(new ComponentConfigurator<>(MongoConfiguration.class)
+ .detectConfiguration(StreamsConfigurator.getConfig().getConfig("mongo")));
+ }
- public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
- this.persistQueue = persistQueue;
- }
-
- public Queue<StreamsDatum> getPersistQueue() {
- return persistQueue;
- }
+ public MongoPersistWriter(MongoConfiguration config) {
+ this.config = config;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
- @Override
- public void write(StreamsDatum streamsDatum) {
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
- DBObject dbObject = prepareObject(streamsDatum);
- if (dbObject != null) {
- addToBatch(dbObject);
- flushIfNecessary();
- }
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- public void flush() throws IOException {
- try {
- LOGGER.debug("Attempting to flush {} items to mongo", insertBatch.size());
- lock.writeLock().lock();
- collection.insert(insertBatch);
- lastWrite.set(System.currentTimeMillis());
- insertBatch = new ArrayList<>();
- } finally {
- lock.writeLock().unlock();
- }
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+ DBObject dbObject = prepareObject(streamsDatum);
+ if (dbObject != null) {
+ addToBatch(dbObject);
+ flushIfNecessary();
}
-
- public synchronized void close() throws IOException {
-// client.cleanCursors(true);
-// backgroundFlushTask.shutdownNow();
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ LOGGER.debug("Attempting to flush {} items to mongo", insertBatch.size());
+ lock.writeLock().lock();
+ collection.insert(insertBatch);
+ lastWrite.set(System.currentTimeMillis());
+ insertBatch = new ArrayList<>();
+ } finally {
+ lock.writeLock().unlock();
}
- public void start() {
- connectToMongo();
- backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- flushIfNecessary();
- }
- }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS);
+ }
+
+ public synchronized void close() throws IOException {
+ client.close();
+ backgroundFlushTask.shutdownNow();
+ }
+
+ /**
+ * start write thread.
+ */
+ public void start() {
+ connectToMongo();
+ backgroundFlushTask.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ flushIfNecessary();
+ }
+ }, 0, MAX_WRITE_LATENCY * 2, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * stop.
+ */
+ public void stop() {
+
+ try {
+ flush();
+ } catch (IOException ex) {
+ LOGGER.error("Error flushing", ex);
}
-
- public void stop() {
-
- try {
- flush();
- } catch (IOException e) {
- LOGGER.error("Error flushing", e);
- }
- try {
- close();
- } catch (IOException e) {
- LOGGER.error("Error closing", e);
- }
- try {
- backgroundFlushTask.shutdown();
- // Wait a while for existing tasks to terminate
- if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
- backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
- LOGGER.error("Stream did not terminate");
- }
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- backgroundFlushTask.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
-
+ try {
+ close();
+ } catch (IOException ex) {
+ LOGGER.error("Error closing", ex);
}
-
- @Override
- public void run() {
-
- while (true) {
- if (persistQueue.peek() != null) {
- try {
- StreamsDatum entry = persistQueue.remove();
- write(entry);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(new Random().nextInt(1));
- } catch (InterruptedException ignored) {
- }
+ try {
+ backgroundFlushTask.shutdown();
+ // Wait a while for existing tasks to terminate
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ backgroundFlushTask.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!backgroundFlushTask.awaitTermination(15, TimeUnit.SECONDS)) {
+ LOGGER.error("Stream did not terminate");
}
-
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.persistQueue = new ConcurrentLinkedQueue<>();
- start();
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ backgroundFlushTask.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
}
- @Override
- public void cleanUp() {
- stop();
- }
+ }
- protected void flushIfNecessary() {
- long lastLatency = System.currentTimeMillis() - lastWrite.get();
- //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater
- //than the maximum desired latency
- if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) {
- try {
- flush();
- } catch (IOException e) {
- LOGGER.error("Error writing to Mongo", e);
- }
- }
- }
+ @Override
+ public void run() {
- protected void addToBatch(DBObject dbObject) {
+ while (true) {
+ if (persistQueue.peek() != null) {
try {
- lock.readLock().lock();
- insertBatch.add(dbObject);
- } finally {
- lock.readLock().unlock();
+ StreamsDatum entry = persistQueue.remove();
+ write(entry);
+ } catch (Exception ex) {
+ ex.printStackTrace();
}
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException interrupt) {
+ LOGGER.trace("Interrupt", interrupt);
+ }
}
- protected DBObject prepareObject(StreamsDatum streamsDatum) {
- DBObject dbObject = null;
- if (streamsDatum.getDocument() instanceof String) {
- dbObject = (DBObject) JSON.parse((String) streamsDatum.getDocument());
- } else {
- try {
- ObjectNode node = mapper.valueToTree(streamsDatum.getDocument());
- dbObject = (DBObject) JSON.parse(node.toString());
- } catch (Exception e) {
- LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), e);
- }
- }
- return dbObject;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ this.persistQueue = new ConcurrentLinkedQueue<>();
+ start();
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ protected void flushIfNecessary() {
+ long lastLatency = System.currentTimeMillis() - lastWrite.get();
+ //Flush iff the size > 0 AND the size is divisible by 100 or the time between now and the last flush is greater
+ //than the maximum desired latency
+ if (insertBatch.size() > 0 && (insertBatch.size() % 100 == 0 || lastLatency > MAX_WRITE_LATENCY)) {
+ try {
+ flush();
+ } catch (IOException ex) {
+ LOGGER.error("Error writing to Mongo", ex);
+ }
}
+ }
+
+ protected void addToBatch(DBObject dbObject) {
+ try {
+ lock.readLock().lock();
+ insertBatch.add(dbObject);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ protected DBObject prepareObject(StreamsDatum streamsDatum) {
+ DBObject dbObject = null;
+ if (streamsDatum.getDocument() instanceof String) {
+ dbObject = (DBObject) JSON.parse((String) streamsDatum.getDocument());
+ } else {
+ try {
+ ObjectNode node = mapper.valueToTree(streamsDatum.getDocument());
+ dbObject = (DBObject) JSON.parse(node.toString());
+ } catch (Exception ex) {
+ LOGGER.error("Unsupported type: " + streamsDatum.getDocument().getClass(), ex);
+ }
+ }
+ return dbObject;
+ }
- private synchronized void connectToMongo() {
-
- ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
+ private synchronized void connectToMongo() {
- if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
- MongoCredential credential =
- MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
- client = new MongoClient(serverAddress, Lists.newArrayList(credential));
- } else {
- client = new MongoClient(serverAddress);
- }
+ ServerAddress serverAddress = new ServerAddress(config.getHost(), config.getPort().intValue());
- db = client.getDB(config.getDb());
+ if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword())) {
+ MongoCredential credential =
+ MongoCredential.createCredential(config.getUser(), config.getDb(), config.getPassword().toCharArray());
+ client = new MongoClient(serverAddress, Lists.newArrayList(credential));
+ } else {
+ client = new MongoClient(serverAddress);
+ }
- if (!db.collectionExists(config.getCollection())) {
- db.createCollection(config.getCollection(), null);
- }
+ db = client.getDB(config.getDb());
- collection = db.getCollection(config.getCollection());
+ if (!db.collectionExists(config.getCollection())) {
+ db.createCollection(config.getCollection(), null);
}
+ collection = db.getCollection(config.getCollection());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
index 18f5a62..2a2e170 100644
--- a/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
+++ b/streams-contrib/streams-persist-mongo/src/test/java/org/apache/streams/mongo/test/MongoPersistIT.java
@@ -18,12 +18,6 @@
package org.apache.streams.mongo.test;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsResultSet;
@@ -32,6 +26,14 @@ import org.apache.streams.mongo.MongoConfiguration;
import org.apache.streams.mongo.MongoPersistReader;
import org.apache.streams.mongo.MongoPersistWriter;
import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -48,64 +50,64 @@ import static org.junit.Assert.assertEquals;
*/
public class MongoPersistIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistIT.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistIT.class);
- ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- MongoConfiguration testConfiguration;
+ MongoConfiguration testConfiguration;
- int count = 0;
+ int count = 0;
- @Before
- public void setup() throws Exception {
+ @Before
+ public void setup() throws Exception {
- Config reference = ConfigFactory.load();
- File conf_file = new File("target/test-classes/MongoPersistIT.conf");
- assert(conf_file.exists());
- Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
- Config typesafe = testResourceConfig.withFallback(reference).resolve();
- testConfiguration = new ComponentConfigurator<>(MongoConfiguration.class).detectConfiguration(typesafe, "mongo");
+ Config reference = ConfigFactory.load();
+ File conf_file = new File("target/test-classes/MongoPersistIT.conf");
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+ testConfiguration = new ComponentConfigurator<>(MongoConfiguration.class).detectConfiguration(typesafe, "mongo");
- }
+ }
- @Test
- public void testMongoPersist() throws Exception {
+ @Test
+ public void testMongoPersist() throws Exception {
- MongoPersistWriter writer = new MongoPersistWriter(testConfiguration);
+ MongoPersistWriter writer = new MongoPersistWriter(testConfiguration);
- writer.prepare(null);
+ writer.prepare(null);
- InputStream testActivityFolderStream = MongoPersistIT.class.getClassLoader()
- .getResourceAsStream("activities");
- List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
+ InputStream testActivityFolderStream = MongoPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities");
+ List<String> files = IOUtils.readLines(testActivityFolderStream, Charsets.UTF_8);
- for( String file : files) {
- LOGGER.info("File: " + file );
- InputStream testActivityFileStream = MongoPersistIT.class.getClassLoader()
- .getResourceAsStream("activities/" + file);
- Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
- activity.getAdditionalProperties().remove("$license");
- StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
- writer.write( datum );
- LOGGER.info("Wrote: " + activity.getVerb() );
- count++;
- }
+ for( String file : files) {
+ LOGGER.info("File: " + file );
+ InputStream testActivityFileStream = MongoPersistIT.class.getClassLoader()
+ .getResourceAsStream("activities/" + file);
+ Activity activity = MAPPER.readValue(testActivityFileStream, Activity.class);
+ activity.getAdditionalProperties().remove("$license");
+ StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+ writer.write( datum );
+ LOGGER.info("Wrote: " + activity.getVerb() );
+ count++;
+ }
- LOGGER.info("Total Written: {}", count );
+ LOGGER.info("Total Written: {}", count );
- assertEquals( 89, count );
+ assertEquals( 89, count );
- writer.cleanUp();
+ writer.cleanUp();
- MongoPersistReader reader = new MongoPersistReader(testConfiguration);
+ MongoPersistReader reader = new MongoPersistReader(testConfiguration);
- reader.prepare(null);
+ reader.prepare(null);
- StreamsResultSet resultSet = reader.readAll();
+ StreamsResultSet resultSet = reader.readAll();
- LOGGER.info("Total Read: {}", resultSet.size() );
+ LOGGER.info("Total Read: {}", resultSet.size() );
- assertEquals( 89, resultSet.size() );
+ assertEquals( 89, resultSet.size() );
- }
+ }
}