You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/03 01:45:11 UTC

[5/7] git commit: Updated based on PR #43 feedback

Updated based on PR #43 feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7b5e0efd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7b5e0efd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7b5e0efd

Branch: refs/heads/master
Commit: 7b5e0efd57caeb076f7ce8083ea2cfc38c995068
Parents: 1f1c1db
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Jul 2 16:26:32 2014 -0700
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Jul 2 16:26:32 2014 -0700

----------------------------------------------------------------------
 .../streams/mongo/MongoPersistReader.java       | 25 +++++++
 .../streams/mongo/MongoPersistReaderTask.java   | 74 --------------------
 2 files changed, 25 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7b5e0efd/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
index d19ccfa..4312c6f 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -242,4 +242,29 @@ public class MongoPersistReader implements StreamsPersistReader {
     private Queue<StreamsDatum> constructQueue() {
         return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
     }
+
+    public class MongoPersistReaderTask implements Runnable {
+
+        private MongoPersistReader reader;
+
+        public MongoPersistReaderTask(MongoPersistReader reader) {
+            this.reader = reader;
+        }
+
+        @Override
+        public void run() {
+
+            try {
+                while(reader.cursor.hasNext()) {
+                    DBObject dbObject = reader.cursor.next();
+                    StreamsDatum datum = reader.prepareDatum(dbObject);
+                    reader.write(datum);
+                }
+            } finally {
+                reader.cursor.close();
+            }
+
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7b5e0efd/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
deleted file mode 100644
index b66e628..0000000
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.mongo;
-
-import com.google.common.base.Strings;
-import com.mongodb.DBObject;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.StreamsDatum;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-
-public class MongoPersistReaderTask implements Runnable {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReaderTask.class);
-
-    private MongoPersistReader reader;
-
-    public MongoPersistReaderTask(MongoPersistReader reader) {
-        this.reader = reader;
-    }
-
-    @Override
-    public void run() {
-
-        try {
-            while(reader.cursor.hasNext()) {
-                DBObject dbObject = reader.cursor.next();
-                StreamsDatum datum = reader.prepareDatum(dbObject);
-                write(datum);
-            }
-        } finally {
-            reader.cursor.close();
-        }
-
-    }
-
-    //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
-    //as it is a synchronized queue.  What we do care about is that we don't want to be offering to the current reference
-    //if the queue is being replaced with a new instance
-    protected void write(StreamsDatum entry) {
-        boolean success;
-        do {
-            try {
-                reader.lock.readLock().lock();
-                success = reader.persistQueue.offer(entry);
-                Thread.yield();
-            }finally {
-                reader.lock.readLock().unlock();
-            }
-        }
-        while (!success);
-    }
-
-}