You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sf...@apache.org on 2015/07/11 00:19:58 UTC
[1/3] incubator-usergrid git commit: adding offer instead of put
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev 48689eb5d -> 1adcfd292
adding offer instead of put
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a842d570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a842d570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a842d570
Branch: refs/heads/two-dot-o-dev
Commit: a842d5700e9d374fc091a3085ddf5e7f30bf806a
Parents: 48689eb
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 10:33:43 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 10:33:43 2015 -0600
----------------------------------------------------------------------
.../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
.../persistence/core/rx/OrderedMergeTest.java | 35 ++++--
2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index 9807749..ec5056e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
/**
* Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -54,7 +56,7 @@ public final class ObservableToBlockingIteratorFactory {
* @return the iterator that could be used to iterate over the elements of the observable.
*/
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
- final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
+ final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -65,61 +67,88 @@ public final class ObservableToBlockingIteratorFactory {
@Override
public void onError(Throwable e) {
+ boolean offerFinished = false;
try{
- notifications.put(Notification.<T>createOnError(e));
- }catch (Exception t){
+ do {
+ offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
+ }while (!offerFinished && !this.isUnsubscribed());
+ }catch (InterruptedException t){
}
}
@Override
public void onNext(Notification<? extends T> args) {
- try{
- notifications.put(args);
- }catch (Exception t){
+ boolean offerFinished = false;
- }
- }
- });
+ try {
+ do {
+ offerFinished = notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
+ } while (!offerFinished && !this.isUnsubscribed());
- return new Iterator<T>() {
- private Notification<? extends T> buf;
+ } catch (InterruptedException t) {
- @Override
- public boolean hasNext() {
- if (buf == null) {
- buf = take();
}
- if (buf.isOnError()) {
- throw Exceptions.propagate(buf.getThrowable());
- }
- return !buf.isOnCompleted();
}
@Override
- public T next() {
- if (hasNext()) {
- T result = buf.getValue();
- buf = null;
- return result;
- }
- throw new NoSuchElementException();
+ protected void finalize() throws Throwable {
+ super.finalize();
}
+ });
- private Notification<? extends T> take() {
- try {
- return notifications.take();
- } catch (InterruptedException e) {
- subscription.unsubscribe();
- throw Exceptions.propagate(e);
- }
- }
+ return new ObservableBlockingIterator<T>(notifications,subscription);
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Read-only iterator");
+ private static class ObservableBlockingIterator<T> implements Iterator<T> {
+ private final BlockingQueue<Notification<? extends T>> notifications;
+ private final Subscription subscription;
+
+ public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
+ this.notifications = notifications;
+ this.subscription = subscription;
+ }
+
+ private Notification<? extends T> buf;
+
+ @Override
+ public boolean hasNext() {
+ if (buf == null) {
+ buf = take();
}
- };
- }
+ if (buf.isOnError()) {
+ throw Exceptions.propagate(buf.getThrowable());
+ }
+ return !buf.isOnCompleted();
+ }
+
+ @Override
+ public T next() {
+ if (hasNext()) {
+ T result = buf.getValue();
+ buf = null;
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+
+ private Notification<? extends T> take() {
+ try {
+ return notifications.take();
+ } catch (InterruptedException e) {
+ subscription.unsubscribe();
+ throw Exceptions.propagate(e);
+ }
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index 649ac7a..a81ef8f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
//pull from source
for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
//emit
- log.info("loop " + count);
+ log.info("inner produce " + count);
subscriber.onNext(count++);
}
}
@@ -559,13 +559,34 @@ public class OrderedMergeTest {
log.info("iteration " + o);
}).subscribeOn(Schedulers.io()));
//never
- Object it =iterator.next();
- it = iterator.next();
- log.info("iterate");
- it = iterator.next();
- log.info("iterate");
+ for(int i =0; i<20;i++){
+ Object it =iterator.next();
+ log.info("iterate "+i);
+ }
+
+ iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
+ int count = 0;
+ while (!subscriber.isUnsubscribed()) {
+ //pull from source
+ for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
+ //emit
+ log.info("inner produce " + count);
+ subscriber.onNext(count++);
+ }
+ }
- Object size = it;
+ subscriber.onCompleted();
+ })
+ .onBackpressureBlock(1)
+ .buffer(2)
+ .doOnNext(o -> {
+ log.info("iteration " + o);
+ }).subscribeOn(Schedulers.io()));
+ //never
+ for(int i =0; i<20;i++){
+ Object it =iterator.next();
+ log.info("iterate "+i);
+ }
}
[3/3] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
into pr/303
Posted by sf...@apache.org.
Merge branch 'two-dot-o-dev' into pr/303
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1adcfd29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1adcfd29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1adcfd29
Branch: refs/heads/two-dot-o-dev
Commit: 1adcfd292ad42fdc65ace103fb8f3b54e51f2cb4
Parents: 7fbb9f5 a842d57
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:19:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:19:19 2015 -0600
----------------------------------------------------------------------
.../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
.../persistence/core/rx/OrderedMergeTest.java | 35 ++++--
2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-usergrid git commit: Update set data migration
version resource to receive integers and echo back the post-update versions.
Posted by sf...@apache.org.
Update set data migration version resource to receive integers and echo back the post-update versions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7fbb9f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7fbb9f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7fbb9f54
Branch: refs/heads/two-dot-o-dev
Commit: 7fbb9f54823f84f34cf3ae92a7cea3f64ac650ec
Parents: 48689eb
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Jul 10 15:09:47 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Jul 10 15:09:47 2015 -0700
----------------------------------------------------------------------
.../apache/usergrid/rest/MigrateResource.java | 26 +++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7fbb9f54/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
index ed0604a..da0ba0f 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
@@ -130,23 +130,37 @@ public class MigrateResource extends AbstractContextResource {
Preconditions.checkNotNull( json, "You must provide a json body" );
Preconditions.checkArgument( json.keySet().size() > 0, "You must specify at least one module and version" );
+ ApiResponse response = createApiResponse();
+ response.setAction("Set Migration Versions");
+
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+
+ final DataMigrationManager dataMigrationManager = getDataMigrationManager();
+ final Set<String> plugins = dataMigrationManager.getPluginNames();
+
/**
* Set the migration version for the plugins specified
*/
for ( final String key : json.keySet() ) {
- String version = ( String ) json.get( key );
- Preconditions.checkArgument( version != null && version.length() > 0,
- "You must specify a version field per module name" );
+ int version = ( int ) json.get( key );
+ dataMigrationManager.resetToVersion(key, version);
+ }
- int intVersion = Integer.parseInt( version );
- getDataMigrationManager().resetToVersion( key, intVersion );
+ /**
+ * Echo back a response of the current versions for all plugins
+ */
+ for(final String pluginName: plugins){
+ node.put(pluginName, dataMigrationManager.getCurrentVersion(pluginName));
}
- return migrateStatus( ui, callback );
+ response.setData( node );
+ response.setSuccess();
+
+ return new JSONWithPadding( response, callback );
}