You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/06/25 23:15:08 UTC

git commit: Created MongoReader. Tested as pertual stream

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-47 [created] 74a38642e


Created MongoReader.  Tested as pertual stream


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/74a38642
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/74a38642
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/74a38642

Branch: refs/heads/STREAMS-47
Commit: 74a38642e6eda05ee6603a66a1e0ba7ea35815d0
Parents: a33c215
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Jun 25 16:14:34 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Jun 25 16:14:34 2014 -0500

----------------------------------------------------------------------
 .../streams/mongo/MongoPersistReader.java       | 245 +++++++++++++++++++
 .../streams/mongo/MongoPersistReaderTask.java   |  74 ++++++
 .../src/main/resources/reference.json           |   8 +
 .../src/main/resources/reference.properties     |  10 -
 4 files changed, 327 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
new file mode 100644
index 0000000..d19ccfa
--- /dev/null
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.mongo;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+import com.mongodb.*;
+import com.mongodb.util.JSON;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.*;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class MongoPersistReader implements StreamsPersistReader {
+
+    public static final String STREAMS_ID = "MongoPersistReader";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MongoPersistReader.class);
+    private final static long MAX_WRITE_LATENCY = 1000;
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+    private volatile AtomicLong lastWrite = new AtomicLong(System.currentTimeMillis());
+
+    private ExecutorService executor;
+
+    private MongoConfiguration config;
+
+    protected DB client;
+    protected DBAddress dbaddress;
+    protected DBCollection collection;
+
+    protected DBCursor cursor;
+
+    protected List<DBObject> insertBatch = Lists.newArrayList();
+
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public MongoPersistReader() {
+        Config config = StreamsConfigurator.config.getConfig("mongo");
+        this.config = MongoConfigurator.detectConfiguration(config);
+    }
+
+    public MongoPersistReader(MongoConfiguration config) {
+        this.config = config;
+    }
+
+    public MongoPersistReader(Queue<StreamsDatum> persistQueue) {
+        Config config = StreamsConfigurator.config.getConfig("mongo");
+        this.config = MongoConfigurator.detectConfiguration(config);
+        this.persistQueue = persistQueue;
+    }
+
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    public Queue<StreamsDatum> getPersistQueue() {
+        return persistQueue;
+    }
+
+    public void stop() {
+
+        try {
+            client.cleanCursors(true);
+            client.requestDone();
+        } catch (Exception e) {
+        } finally {
+            client.requestDone();
+        }
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+        connectToMongo();
+
+        if( client == null ||
+                dbaddress == null ||
+                collection == null )
+            throw new RuntimeException("Unable to connect!");
+
+        cursor = collection.find();
+
+        if( cursor == null ||
+            cursor.hasNext() == false )
+            throw new RuntimeException("Collection not present or empty!");
+
+        persistQueue = constructQueue();
+
+        executor = Executors.newSingleThreadExecutor();
+
+    }
+
+    @Override
+    public void cleanUp() {
+        stop();
+    }
+
+    protected StreamsDatum prepareDatum(DBObject dbObject) {
+
+        StreamsDatum datum = new StreamsDatum(dbObject.toString());
+
+        return datum;
+    }
+
+    private synchronized void connectToMongo() {
+
+        try {
+            dbaddress = new DBAddress(config.getHost(), config.getPort().intValue(), config.getDb());
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+            return;
+        }
+
+        client = MongoClient.connect(dbaddress);
+
+        if (!Strings.isNullOrEmpty(config.getUser()) && !Strings.isNullOrEmpty(config.getPassword()))
+            client.authenticate(config.getUser(), config.getPassword().toCharArray());
+
+        if (!client.collectionExists(config.getCollection())) {
+            client.createCollection(config.getCollection(), null);
+        }
+        ;
+
+        collection = client.getCollection(config.getCollection());
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+
+        DBCursor cursor = collection.find();
+        try {
+            while(cursor.hasNext()) {
+                DBObject dbObject = cursor.next();
+                StreamsDatum datum = prepareDatum(dbObject);
+                write(datum);
+            }
+        } finally {
+            cursor.close();
+        }
+
+        return readCurrent();
+    }
+
+    @Override
+    public void startStream() {
+
+        LOGGER.debug("startStream");
+        MongoPersistReaderTask readerTask = new MongoPersistReaderTask(this);
+        executor.submit(readerTask);
+        executor.shutdown();
+
+    }
+
+    @Override
+    public StreamsResultSet readCurrent() {
+
+        StreamsResultSet current;
+
+        try {
+            lock.writeLock().lock();
+            current = new StreamsResultSet(persistQueue);
+            current.setCounter(new DatumStatusCounter());
+//            current.getCounter().add(countersCurrent);
+//            countersTotal.add(countersCurrent);
+//            countersCurrent = new DatumStatusCounter();
+            persistQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        return current;
+    }
+
+    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
+    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
+    //if the queue is being replaced with a new instance
+    protected void write(StreamsDatum entry) {
+        boolean success;
+        do {
+            try {
+                lock.readLock().lock();
+                success = persistQueue.offer(entry);
+                Thread.yield();
+            } finally {
+                lock.readLock().unlock();
+            }
+        }
+        while (!success);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !executor.isTerminated();
+    }
+
+    private Queue<StreamsDatum> constructQueue() {
+        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
new file mode 100644
index 0000000..b66e628
--- /dev/null
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.mongo;
+
+import com.google.common.base.Strings;
+import com.mongodb.DBObject;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+
+public class MongoPersistReaderTask implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReaderTask.class);
+
+    private MongoPersistReader reader;
+
+    public MongoPersistReaderTask(MongoPersistReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+
+        try {
+            while(reader.cursor.hasNext()) {
+                DBObject dbObject = reader.cursor.next();
+                StreamsDatum datum = reader.prepareDatum(dbObject);
+                write(datum);
+            }
+        } finally {
+            reader.cursor.close();
+        }
+
+    }
+
+    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
+    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
+    //if the queue is being replaced with a new instance
+    protected void write(StreamsDatum entry) {
+        boolean success;
+        do {
+            try {
+                reader.lock.readLock().lock();
+                success = reader.persistQueue.offer(entry);
+                Thread.yield();
+            }finally {
+                reader.lock.readLock().unlock();
+            }
+        }
+        while (!success);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/resources/reference.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/resources/reference.json b/streams-contrib/streams-persist-mongo/src/main/resources/reference.json
new file mode 100644
index 0000000..41c9a12
--- /dev/null
+++ b/streams-contrib/streams-persist-mongo/src/main/resources/reference.json
@@ -0,0 +1,8 @@
+{
+    "mongo": {
+        "host": "localhost",
+        "port": 27017,
+        "db": "local",
+        "collection": "startup_log"
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/74a38642/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties b/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
deleted file mode 100644
index 699f655..0000000
--- a/streams-contrib/streams-persist-mongo/src/main/resources/reference.properties
+++ /dev/null
@@ -1,10 +0,0 @@
-hbase.rootdir = "hdfs://localhost:8020/hbase"
-
-zookeeper.znode.parent = "/hbase"
-
-zookeeper.znode.rootserver = "localhost"
-
-hbase.zookeeper.quorum = "localhost"
-
-hbase.zookeeper.property.clientPort = 2181
-