You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/14 19:10:09 UTC

[04/14] incubator-usergrid git commit: Revert "adding offer instead of put"

Revert "adding offer instead of put"

This reverts commit a842d5700e9d374fc091a3085ddf5e7f30bf806a.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/de1daaeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/de1daaeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/de1daaeb

Branch: refs/heads/observable-query-fix
Commit: de1daaeb19c438f458cb608cfa9ede4ede5f8f97
Parents: 1adcfd2
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:44:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:44:10 2015 -0600

----------------------------------------------------------------------
 .../rx/ObservableToBlockingIteratorFactory.java | 107 +++++++------------
 .../persistence/core/rx/OrderedMergeTest.java   |  35 ++----
 2 files changed, 46 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index ec5056e..9807749 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,8 +31,6 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -56,7 +54,7 @@ public final class ObservableToBlockingIteratorFactory {
      * @return the iterator that could be used to iterate over the elements of the observable.
      */
     public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
-        final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
+        final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
 
         // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
         final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -67,88 +65,61 @@ public final class ObservableToBlockingIteratorFactory {
 
             @Override
             public void onError(Throwable e) {
-                boolean offerFinished = false;
                 try{
-                    do {
-                        offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
-                    }while (!offerFinished && !this.isUnsubscribed());
-                }catch (InterruptedException t){
+                    notifications.put(Notification.<T>createOnError(e));
+                }catch (Exception t){
 
                 }
             }
 
             @Override
             public void onNext(Notification<? extends T> args) {
-                boolean offerFinished = false;
+                try{
+                    notifications.put(args);
+                }catch (Exception t){
 
-                try {
-                    do {
-                        offerFinished =  notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
-                    } while (!offerFinished && !this.isUnsubscribed());
+                }
+            }
+        });
 
-                } catch (InterruptedException t) {
+        return new Iterator<T>() {
+            private Notification<? extends T> buf;
 
+            @Override
+            public boolean hasNext() {
+                if (buf == null) {
+                    buf = take();
                 }
+                if (buf.isOnError()) {
+                    throw Exceptions.propagate(buf.getThrowable());
+                }
+                return !buf.isOnCompleted();
             }
 
             @Override
-            protected void finalize() throws Throwable {
-                super.finalize();
+            public T next() {
+                if (hasNext()) {
+                    T result = buf.getValue();
+                    buf = null;
+                    return result;
+                }
+                throw new NoSuchElementException();
             }
-        });
-
-        return new ObservableBlockingIterator<T>(notifications,subscription);
-    }
-
-    private static class ObservableBlockingIterator<T> implements Iterator<T> {
-        private final BlockingQueue<Notification<? extends T>> notifications;
-        private final Subscription subscription;
 
-        public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
-            this.notifications = notifications;
-            this.subscription = subscription;
-        }
-
-        private Notification<? extends T> buf;
-
-        @Override
-        public boolean hasNext() {
-            if (buf == null) {
-                buf = take();
-            }
-            if (buf.isOnError()) {
-                throw Exceptions.propagate(buf.getThrowable());
-            }
-            return !buf.isOnCompleted();
-        }
-
-        @Override
-        public T next() {
-            if (hasNext()) {
-                T result = buf.getValue();
-                buf = null;
-                return result;
-            }
-            throw new NoSuchElementException();
-        }
-
-        private Notification<? extends T> take() {
-            try {
-                return notifications.take();
-            } catch (InterruptedException e) {
-                subscription.unsubscribe();
-                throw Exceptions.propagate(e);
+            private Notification<? extends T> take() {
+                try {
+                    return notifications.take();
+                } catch (InterruptedException e) {
+                    subscription.unsubscribe();
+                    throw Exceptions.propagate(e);
+                }
             }
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Read-only iterator");
-        }
 
-        @Override
-        protected void finalize() throws Throwable {
-            super.finalize();
-        }
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("Read-only iterator");
+            }
+        };
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index a81ef8f..649ac7a 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
                 //pull from source
                 for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
                     //emit
-                    log.info("inner produce " + count);
+                    log.info("loop " + count);
                     subscriber.onNext(count++);
                 }
             }
@@ -559,34 +559,13 @@ public class OrderedMergeTest {
                 log.info("iteration " + o);
             }).subscribeOn(Schedulers.io()));
         //never
-        for(int i =0; i<20;i++){
-            Object it =iterator.next();
-            log.info("iterate "+i);
-        }
-
-        iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
-            int count = 0;
-            while (!subscriber.isUnsubscribed()) {
-                //pull from source
-                for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
-                    //emit
-                    log.info("inner produce " + count);
-                    subscriber.onNext(count++);
-                }
-            }
+        Object it =iterator.next();
+        it = iterator.next();
+        log.info("iterate");
+        it = iterator.next();
+        log.info("iterate");
 
-            subscriber.onCompleted();
-        })
-            .onBackpressureBlock(1)
-            .buffer(2)
-            .doOnNext(o -> {
-                log.info("iteration " + o);
-            }).subscribeOn(Schedulers.io()));
-        //never
-        for(int i =0; i<20;i++){
-            Object it =iterator.next();
-            log.info("iterate "+i);
-        }
+        Object size = it;
     }