You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/14 19:10:09 UTC
[04/14] incubator-usergrid git commit: Revert "adding offer instead
of put"
Revert "adding offer instead of put"
This reverts commit a842d5700e9d374fc091a3085ddf5e7f30bf806a.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/de1daaeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/de1daaeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/de1daaeb
Branch: refs/heads/observable-query-fix
Commit: de1daaeb19c438f458cb608cfa9ede4ede5f8f97
Parents: 1adcfd2
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:44:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:44:10 2015 -0600
----------------------------------------------------------------------
.../rx/ObservableToBlockingIteratorFactory.java | 107 +++++++------------
.../persistence/core/rx/OrderedMergeTest.java | 35 ++----
2 files changed, 46 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index ec5056e..9807749 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,8 +31,6 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
/**
* Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -56,7 +54,7 @@ public final class ObservableToBlockingIteratorFactory {
* @return the iterator that could be used to iterate over the elements of the observable.
*/
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
- final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
+ final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -67,88 +65,61 @@ public final class ObservableToBlockingIteratorFactory {
@Override
public void onError(Throwable e) {
- boolean offerFinished = false;
try{
- do {
- offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
- }while (!offerFinished && !this.isUnsubscribed());
- }catch (InterruptedException t){
+ notifications.put(Notification.<T>createOnError(e));
+ }catch (Exception t){
}
}
@Override
public void onNext(Notification<? extends T> args) {
- boolean offerFinished = false;
+ try{
+ notifications.put(args);
+ }catch (Exception t){
- try {
- do {
- offerFinished = notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
- } while (!offerFinished && !this.isUnsubscribed());
+ }
+ }
+ });
- } catch (InterruptedException t) {
+ return new Iterator<T>() {
+ private Notification<? extends T> buf;
+ @Override
+ public boolean hasNext() {
+ if (buf == null) {
+ buf = take();
}
+ if (buf.isOnError()) {
+ throw Exceptions.propagate(buf.getThrowable());
+ }
+ return !buf.isOnCompleted();
}
@Override
- protected void finalize() throws Throwable {
- super.finalize();
+ public T next() {
+ if (hasNext()) {
+ T result = buf.getValue();
+ buf = null;
+ return result;
+ }
+ throw new NoSuchElementException();
}
- });
-
- return new ObservableBlockingIterator<T>(notifications,subscription);
- }
-
- private static class ObservableBlockingIterator<T> implements Iterator<T> {
- private final BlockingQueue<Notification<? extends T>> notifications;
- private final Subscription subscription;
- public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
- this.notifications = notifications;
- this.subscription = subscription;
- }
-
- private Notification<? extends T> buf;
-
- @Override
- public boolean hasNext() {
- if (buf == null) {
- buf = take();
- }
- if (buf.isOnError()) {
- throw Exceptions.propagate(buf.getThrowable());
- }
- return !buf.isOnCompleted();
- }
-
- @Override
- public T next() {
- if (hasNext()) {
- T result = buf.getValue();
- buf = null;
- return result;
- }
- throw new NoSuchElementException();
- }
-
- private Notification<? extends T> take() {
- try {
- return notifications.take();
- } catch (InterruptedException e) {
- subscription.unsubscribe();
- throw Exceptions.propagate(e);
+ private Notification<? extends T> take() {
+ try {
+ return notifications.take();
+ } catch (InterruptedException e) {
+ subscription.unsubscribe();
+ throw Exceptions.propagate(e);
+ }
}
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Read-only iterator");
- }
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+ };
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index a81ef8f..649ac7a 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
//pull from source
for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
//emit
- log.info("inner produce " + count);
+ log.info("loop " + count);
subscriber.onNext(count++);
}
}
@@ -559,34 +559,13 @@ public class OrderedMergeTest {
log.info("iteration " + o);
}).subscribeOn(Schedulers.io()));
//never
- for(int i =0; i<20;i++){
- Object it =iterator.next();
- log.info("iterate "+i);
- }
-
- iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
- int count = 0;
- while (!subscriber.isUnsubscribed()) {
- //pull from source
- for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
- //emit
- log.info("inner produce " + count);
- subscriber.onNext(count++);
- }
- }
+ Object it =iterator.next();
+ it = iterator.next();
+ log.info("iterate");
+ it = iterator.next();
+ log.info("iterate");
- subscriber.onCompleted();
- })
- .onBackpressureBlock(1)
- .buffer(2)
- .doOnNext(o -> {
- log.info("iteration " + o);
- }).subscribeOn(Schedulers.io()));
- //never
- for(int i =0; i<20;i++){
- Object it =iterator.next();
- log.info("iterate "+i);
- }
+ Object size = it;
}