You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/06/25 23:15:08 UTC
git commit: Created MongoReader. Tested as pertual stream
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-47 [created] 74a38642e
Created MongoReader. Tested as pertual stream
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/74a38642
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/74a38642
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/74a38642
Branch: refs/heads/STREAMS-47
Commit: 74a38642e6eda05ee6603a66a1e0ba7ea35815d0
Parents: a33c215
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Jun 25 16:14:34 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Jun 25 16:14:34 2014 -0500
----------------------------------------------------------------------
.../streams/mongo/MongoPersistReader.java | 245 +++++++++++++++++++
.../streams/mongo/MongoPersistReaderTask.java | 74 ++++++
.../src/main/resources/reference.json | 8 +
.../src/main/resources/reference.properties | 10 -
4 files changed, 327 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
new file mode 100644
index 0000000..d19ccfa
--- /dev/null
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.mongo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import com.mongodb.*;
+import com.mongodb.util.JSON;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class MongoPersistReader implements StreamsPersistReader {
+
+ public static final String STREAMS_ID = "MongoPersistReader";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class);
+ private final static long MAX_WRITE_LATENCY = 1000;
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+
+ private ExecutorService executor;
+
+ private MongoConfiguration config;
+
+ protected DB client;
+ protected DBAddress dbaddress;
+ protected DBCollection collection;
+
+ protected DBCursor cursor;
+
+ protected List<DBObject> insertBatch = Lists.newArrayList();
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public MongoPersistReader() {
+ Config config = StreamsConfigurator.config.getConfig("mongo");
+ this.config = MongoConfigurator.detectConfiguration(config);
+ }
+
+ public MongoPersistReader(MongoConfiguration config) {
+ this.config = config;
+ }
+
+ public MongoPersistReader(Queue<StreamsDatum> persistQueue) {
+ Config config = StreamsConfigurator.config.getConfig("mongo");
+ this.config = MongoConfigurator.detectConfiguration(config);
+ this.persistQueue = persistQueue;
+ }
+
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
+
+ public void stop() {
+
+ try {
+ client.cleanCursors(true);
+ client.requestDone();
+ } catch (Exception e) {
+ } finally {
+ client.requestDone();
+ }
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ connectToMongo();
+
+ if( client == null ||
+ dbaddress == null ||
+ collection == null )
+ throw new RuntimeException("Unable to connect!");
+
+ cursor = collection.find();
+
+ if( cursor == null ||
+ cursor.hasNext() == false )
+ throw new RuntimeException("Collection not present or empty!");
+
+ persistQueue = constructQueue();
+
+ executor = Executors.newSingleThreadExecutor();
+
+ }
+
+ @Override
+ public void cleanUp() {
+ stop();
+ }
+
+ protected StreamsDatum prepareDatum(DBObject dbObject) {
+
+ StreamsDatum datum = new StreamsDatum(dbObject.toString());
+
+ return datum;
+ }
+
+ private synchronized void connectToMongo() {
+
+ try {
+ dbaddress = new DBAddress(config.getHost(), config.getPort().intValue(), config.getDb());
+ } catch (UnknownHostException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ client = MongoClient.connect(dbaddress);
+
+ if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword()))
+ client.authenticate(config.getUser(), config.getPassword().toCharArray());
+
+ if (!client.collectionExists(config.getCollection())) {
+ client.createCollection(config.getCollection(), null);
+ }
+ ;
+
+ collection = client.getCollection(config.getCollection());
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+
+ DBCursor cursor = collection.find();
+ try {
+ while(cursor.hasNext()) {
+ DBObject dbObject = cursor.next();
+ StreamsDatum datum = prepareDatum(dbObject);
+ write(datum);
+ }
+ } finally {
+ cursor.close();
+ }
+
+ return readCurrent();
+ }
+
+ @Override
+ public void startStream() {
+
+ LOGGER.debug("startStream");
+ MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this);
+ executor.submit(readerTask);
+ executor.shutdown();
+
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+
+ try {
+ lock.writeLock().lock();
+ current = new StreamsResultSet(persistQueue);
+ current.setCounter(new DatumStatusCounter());
+// current.getCounter().add(countersCurrent);
+// countersTotal.add(countersCurrent);
+// countersCurrent = new DatumStatusCounter();
+ persistQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ return current;
+ }
+
+ //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
+ //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference
+ //if the queue is being replaced with a new instance
+ protected void write(StreamsDatum entry) {
+ boolean success;
+ do {
+ try {
+ lock.readLock().lock();
+ success = persistQueue.offer(entry);
+ Thread.yield();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ while (!success);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !executor.isTerminated();
+ }
+
+ private Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
new file mode 100644
index 0000000..b66e628
--- /dev/null
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.mongo;
+
+import com.google.common.base.Strings;
+import com.mongodb.DBObject;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+
+public class MongoPersistReaderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReaderTask.class);
+
+ private MongoPersistReader reader;
+
+ public MongoPersistReaderTask(MongoPersistReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ while(reader.cursor.hasNext()) {
+ DBObject dbObject = reader.cursor.next();
+ StreamsDatum datum = reader.prepareDatum(dbObject);
+ write(datum);
+ }
+ } finally {
+ reader.cursor.close();
+ }
+
+ }
+
+ //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
+ //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference
+ //if the queue is being replaced with a new instance
+ protected void write(StreamsDatum entry) {
+ boolean success;
+ do {
+ try {
+ reader.lock.readLock().lock();
+ success = reader.persistQueue.offer(entry);
+ Thread.yield();
+ }finally {
+ reader.lock.readLock().unlock();
+ }
+ }
+ while (!success);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/resources/reference.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/resources/reference.json b/streams-contrib/streams-persist-mongo/src/main/resources/reference.json
new file mode 100644
index 0000000..41c9a12
--- /dev/null
+++ b/streams-contrib/streams-persist-mongo/src/main/resources/reference.json
@@ -0,0 +1,8 @@
+{
+ "mongo": {
+ "host": "localhost",
+ "port": 27017,
+ "db": "local",
+ "collection": "startup_log"
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties b/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
deleted file mode 100644
index 699f655..0000000
--- a/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
+++ /dev/null
@@ -1,10 +0,0 @@
-hbase.rootdir = "hdfs://localhost:8020/hbase"
-
-zookeeper.znode.parent = "/hbase"
-
-zookeeper.znode.rootserver = "localhost"
-
-hbase.zookeeper.quorum = "localhost"
-
-hbase.zookeeper.property.clientPort = 2181
-