You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/03 01:45:11 UTC
[5/7] git commit: Updated based on PR #43 feedback
Updated based on PR #43 feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/7b5e0efd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/7b5e0efd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/7b5e0efd
Branch: refs/heads/master
Commit: 7b5e0efd57caeb076f7ce8083ea2cfc38c995068
Parents: 1f1c1db
Author: sblackmon <sb...@w2odigital.com>
Authored: Wed Jul 2 16:26:32 2014 -0700
Committer: sblackmon <sb...@w2odigital.com>
Committed: Wed Jul 2 16:26:32 2014 -0700
----------------------------------------------------------------------
.../streams/mongo/MongoPersistReader.java | 25 +++++++
.../streams/mongo/MongoPersistReaderTask.java | 74 --------------------
2 files changed, 25 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7b5e0efd/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
index d19ccfa..4312c6f 100644
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
+++ b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReader.java
@@ -242,4 +242,29 @@ public class MongoPersistReader implements StreamsPersistReader {
private Queue<StreamsDatum> constructQueue() {
return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
}
+
+ public class MongoPersistReaderTask implements Runnable {
+
+ private MongoPersistReader reader;
+
+ public MongoPersistReaderTask(MongoPersistReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ while(reader.cursor.hasNext()) {
+ DBObject dbObject = reader.cursor.next();
+ StreamsDatum datum = reader.prepareDatum(dbObject);
+ reader.write(datum);
+ }
+ } finally {
+ reader.cursor.close();
+ }
+
+ }
+
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/7b5e0efd/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java b/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
deleted file mode 100644
index b66e628..0000000
--- a/streams-contrib/streams-persist-mongo/src/main/java/org/apache/streams/mongo/MongoPersistReaderTask.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.mongo;
-
-import com.google.common.base.Strings;
-import com.mongodb.DBObject;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.StreamsDatum;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-
-public class MongoPersistReaderTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(MongoPersistReaderTask.class);
-
- private MongoPersistReader reader;
-
- public MongoPersistReaderTask(MongoPersistReader reader) {
- this.reader = reader;
- }
-
- @Override
- public void run() {
-
- try {
- while(reader.cursor.hasNext()) {
- DBObject dbObject = reader.cursor.next();
- StreamsDatum datum = reader.prepareDatum(dbObject);
- write(datum);
- }
- } finally {
- reader.cursor.close();
- }
-
- }
-
- //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue
- //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference
- //if the queue is being replaced with a new instance
- protected void write(StreamsDatum entry) {
- boolean success;
- do {
- try {
- reader.lock.readLock().lock();
- success = reader.persistQueue.offer(entry);
- Thread.yield();
- }finally {
- reader.lock.readLock().unlock();
- }
- }
- while (!success);
- }
-
-}