You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/11 00:19:58 UTC

[1/3] incubator-usergrid git commit: adding offer instead of put

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev 48689eb5d -> 1adcfd292


adding offer instead of put


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a842d570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a842d570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a842d570

Branch: refs/heads/two-dot-o-dev
Commit: a842d5700e9d374fc091a3085ddf5e7f30bf806a
Parents: 48689eb
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 10:33:43 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 10:33:43 2015 -0600

----------------------------------------------------------------------
 .../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
 .../persistence/core/rx/OrderedMergeTest.java   |  35 ++++--
 2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index 9807749..ec5056e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -54,7 +56,7 @@ public final class ObservableToBlockingIteratorFactory {
      * @return the iterator that could be used to iterate over the elements of the observable.
      */
     public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
-        final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
+        final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
 
         // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
         final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -65,61 +67,88 @@ public final class ObservableToBlockingIteratorFactory {
 
             @Override
             public void onError(Throwable e) {
+                boolean offerFinished = false;
                 try{
-                    notifications.put(Notification.<T>createOnError(e));
-                }catch (Exception t){
+                    do {
+                        offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
+                    }while (!offerFinished && !this.isUnsubscribed());
+                }catch (InterruptedException t){
 
                 }
             }
 
             @Override
             public void onNext(Notification<? extends T> args) {
-                try{
-                    notifications.put(args);
-                }catch (Exception t){
+                boolean offerFinished = false;
 
-                }
-            }
-        });
+                try {
+                    do {
+                        offerFinished =  notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
+                    } while (!offerFinished && !this.isUnsubscribed());
 
-        return new Iterator<T>() {
-            private Notification<? extends T> buf;
+                } catch (InterruptedException t) {
 
-            @Override
-            public boolean hasNext() {
-                if (buf == null) {
-                    buf = take();
                 }
-                if (buf.isOnError()) {
-                    throw Exceptions.propagate(buf.getThrowable());
-                }
-                return !buf.isOnCompleted();
             }
 
             @Override
-            public T next() {
-                if (hasNext()) {
-                    T result = buf.getValue();
-                    buf = null;
-                    return result;
-                }
-                throw new NoSuchElementException();
+            protected void finalize() throws Throwable {
+                super.finalize();
             }
+        });
 
-            private Notification<? extends T> take() {
-                try {
-                    return notifications.take();
-                } catch (InterruptedException e) {
-                    subscription.unsubscribe();
-                    throw Exceptions.propagate(e);
-                }
-            }
+        return new ObservableBlockingIterator<T>(notifications,subscription);
+    }
 
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("Read-only iterator");
+    private static class ObservableBlockingIterator<T> implements Iterator<T> {
+        private final BlockingQueue<Notification<? extends T>> notifications;
+        private final Subscription subscription;
+
+        public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
+            this.notifications = notifications;
+            this.subscription = subscription;
+        }
+
+        private Notification<? extends T> buf;
+
+        @Override
+        public boolean hasNext() {
+            if (buf == null) {
+                buf = take();
             }
-        };
-    }
+            if (buf.isOnError()) {
+                throw Exceptions.propagate(buf.getThrowable());
+            }
+            return !buf.isOnCompleted();
+        }
+
+        @Override
+        public T next() {
+            if (hasNext()) {
+                T result = buf.getValue();
+                buf = null;
+                return result;
+            }
+            throw new NoSuchElementException();
+        }
+
+        private Notification<? extends T> take() {
+            try {
+                return notifications.take();
+            } catch (InterruptedException e) {
+                subscription.unsubscribe();
+                throw Exceptions.propagate(e);
+            }
+        }
 
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Read-only iterator");
+        }
+
+        @Override
+        protected void finalize() throws Throwable {
+            super.finalize();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index 649ac7a..a81ef8f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
                 //pull from source
                 for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
                     //emit
-                    log.info("loop " + count);
+                    log.info("inner produce " + count);
                     subscriber.onNext(count++);
                 }
             }
@@ -559,13 +559,34 @@ public class OrderedMergeTest {
                 log.info("iteration " + o);
             }).subscribeOn(Schedulers.io()));
         //never
-        Object it =iterator.next();
-        it = iterator.next();
-        log.info("iterate");
-        it = iterator.next();
-        log.info("iterate");
+        for(int i =0; i<20;i++){
+            Object it =iterator.next();
+            log.info("iterate "+i);
+        }
+
+        iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
+            int count = 0;
+            while (!subscriber.isUnsubscribed()) {
+                //pull from source
+                for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
+                    //emit
+                    log.info("inner produce " + count);
+                    subscriber.onNext(count++);
+                }
+            }
 
-        Object size = it;
+            subscriber.onCompleted();
+        })
+            .onBackpressureBlock(1)
+            .buffer(2)
+            .doOnNext(o -> {
+                log.info("iteration " + o);
+            }).subscribeOn(Schedulers.io()));
+        //never
+        for(int i =0; i<20;i++){
+            Object it =iterator.next();
+            log.info("iterate "+i);
+        }
     }
 
 


[3/3] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' into pr/303

Posted by sf...@apache.org.
Merge branch 'two-dot-o-dev' into pr/303


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1adcfd29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1adcfd29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1adcfd29

Branch: refs/heads/two-dot-o-dev
Commit: 1adcfd292ad42fdc65ace103fb8f3b54e51f2cb4
Parents: 7fbb9f5 a842d57
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:19:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:19:19 2015 -0600

----------------------------------------------------------------------
 .../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
 .../persistence/core/rx/OrderedMergeTest.java   |  35 ++++--
 2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------



[2/3] incubator-usergrid git commit: Update set data migration version resource to receive integers and echo back the post-update versions.

Posted by sf...@apache.org.
Update set data migration version resource to receive integers and echo back the post-update versions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7fbb9f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7fbb9f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7fbb9f54

Branch: refs/heads/two-dot-o-dev
Commit: 7fbb9f54823f84f34cf3ae92a7cea3f64ac650ec
Parents: 48689eb
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Jul 10 15:09:47 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Jul 10 15:09:47 2015 -0700

----------------------------------------------------------------------
 .../apache/usergrid/rest/MigrateResource.java   | 26 +++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7fbb9f54/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
index ed0604a..da0ba0f 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
@@ -130,23 +130,37 @@ public class MigrateResource extends AbstractContextResource {
         Preconditions.checkNotNull( json, "You must provide a json body" );
         Preconditions.checkArgument( json.keySet().size() > 0, "You must specify at least one module and version" );
 
+        ApiResponse response = createApiResponse();
+        response.setAction("Set Migration Versions");
+
+        ObjectNode node = JsonNodeFactory.instance.objectNode();
+
+        final DataMigrationManager dataMigrationManager = getDataMigrationManager();
+        final Set<String> plugins = dataMigrationManager.getPluginNames();
+
         /**
          *  Set the migration version for the plugins specified
          */
         for ( final String key : json.keySet() ) {
-            String version = ( String ) json.get( key );
 
-            Preconditions.checkArgument( version != null && version.length() > 0,
-                "You must specify a version field per module name" );
+            int version = ( int ) json.get( key );
 
+            dataMigrationManager.resetToVersion(key, version);
+        }
 
-            int intVersion = Integer.parseInt( version );
 
-            getDataMigrationManager().resetToVersion( key, intVersion );
+        /**
+         *  Echo back a response of the current versions for all plugins
+         */
+        for(final String pluginName: plugins){
+            node.put(pluginName, dataMigrationManager.getCurrentVersion(pluginName));
         }
 
 
-        return migrateStatus( ui, callback );
+        response.setData( node );
+        response.setSuccess();
+
+        return new JSONWithPadding( response, callback );
     }