You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/14 19:10:06 UTC
[01/14] incubator-usergrid git commit: adding offer instead of put
Repository: incubator-usergrid
Updated Branches:
refs/heads/observable-query-fix 208332da9 -> 7256ea731
adding offer instead of put
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a842d570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a842d570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a842d570
Branch: refs/heads/observable-query-fix
Commit: a842d5700e9d374fc091a3085ddf5e7f30bf806a
Parents: 48689eb
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 10:33:43 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 10:33:43 2015 -0600
----------------------------------------------------------------------
.../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
.../persistence/core/rx/OrderedMergeTest.java | 35 ++++--
2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index 9807749..ec5056e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
/**
* Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -54,7 +56,7 @@ public final class ObservableToBlockingIteratorFactory {
* @return the iterator that could be used to iterate over the elements of the observable.
*/
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
- final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
+ final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -65,61 +67,88 @@ public final class ObservableToBlockingIteratorFactory {
@Override
public void onError(Throwable e) {
+ boolean offerFinished = false;
try{
- notifications.put(Notification.<T>createOnError(e));
- }catch (Exception t){
+ do {
+ offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
+ }while (!offerFinished && !this.isUnsubscribed());
+ }catch (InterruptedException t){
}
}
@Override
public void onNext(Notification<? extends T> args) {
- try{
- notifications.put(args);
- }catch (Exception t){
+ boolean offerFinished = false;
- }
- }
- });
+ try {
+ do {
+ offerFinished = notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
+ } while (!offerFinished && !this.isUnsubscribed());
- return new Iterator<T>() {
- private Notification<? extends T> buf;
+ } catch (InterruptedException t) {
- @Override
- public boolean hasNext() {
- if (buf == null) {
- buf = take();
}
- if (buf.isOnError()) {
- throw Exceptions.propagate(buf.getThrowable());
- }
- return !buf.isOnCompleted();
}
@Override
- public T next() {
- if (hasNext()) {
- T result = buf.getValue();
- buf = null;
- return result;
- }
- throw new NoSuchElementException();
+ protected void finalize() throws Throwable {
+ super.finalize();
}
+ });
- private Notification<? extends T> take() {
- try {
- return notifications.take();
- } catch (InterruptedException e) {
- subscription.unsubscribe();
- throw Exceptions.propagate(e);
- }
- }
+ return new ObservableBlockingIterator<T>(notifications,subscription);
+ }
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Read-only iterator");
+ private static class ObservableBlockingIterator<T> implements Iterator<T> {
+ private final BlockingQueue<Notification<? extends T>> notifications;
+ private final Subscription subscription;
+
+ public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
+ this.notifications = notifications;
+ this.subscription = subscription;
+ }
+
+ private Notification<? extends T> buf;
+
+ @Override
+ public boolean hasNext() {
+ if (buf == null) {
+ buf = take();
}
- };
- }
+ if (buf.isOnError()) {
+ throw Exceptions.propagate(buf.getThrowable());
+ }
+ return !buf.isOnCompleted();
+ }
+
+ @Override
+ public T next() {
+ if (hasNext()) {
+ T result = buf.getValue();
+ buf = null;
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+
+ private Notification<? extends T> take() {
+ try {
+ return notifications.take();
+ } catch (InterruptedException e) {
+ subscription.unsubscribe();
+ throw Exceptions.propagate(e);
+ }
+ }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+
+ @Override
+ protected void finalize() throws Throwable {
+ super.finalize();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index 649ac7a..a81ef8f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
//pull from source
for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
//emit
- log.info("loop " + count);
+ log.info("inner produce " + count);
subscriber.onNext(count++);
}
}
@@ -559,13 +559,34 @@ public class OrderedMergeTest {
log.info("iteration " + o);
}).subscribeOn(Schedulers.io()));
//never
- Object it =iterator.next();
- it = iterator.next();
- log.info("iterate");
- it = iterator.next();
- log.info("iterate");
+ for(int i =0; i<20;i++){
+ Object it =iterator.next();
+ log.info("iterate "+i);
+ }
+
+ iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
+ int count = 0;
+ while (!subscriber.isUnsubscribed()) {
+ //pull from source
+ for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
+ //emit
+ log.info("inner produce " + count);
+ subscriber.onNext(count++);
+ }
+ }
- Object size = it;
+ subscriber.onCompleted();
+ })
+ .onBackpressureBlock(1)
+ .buffer(2)
+ .doOnNext(o -> {
+ log.info("iteration " + o);
+ }).subscribeOn(Schedulers.io()));
+ //never
+ for(int i =0; i<20;i++){
+ Object it =iterator.next();
+ log.info("iterate "+i);
+ }
}
[09/14] incubator-usergrid git commit: fix deserialization
Posted by to...@apache.org.
fix deserialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/68cafb7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/68cafb7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/68cafb7e
Branch: refs/heads/observable-query-fix
Commit: 68cafb7e878eb853f280b7374f53dfd499b0dcac
Parents: 09873ce
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 16:57:51 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 16:57:51 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 2 +-
.../asyncevents/model/AsyncEvent.java | 7 ++--
.../index/ReplicatedIndexLocationStrategy.java | 38 ++++++++++++++++++--
.../queue/impl/SNSQueueManagerImpl.java | 5 +--
4 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 1390058..70eb361 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -235,9 +235,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
handleEntityIndexUpdate(message);
break;
-
case APPLICATION_INDEX:
handleInitializeApplicationIndex(message);
+ break;
default:
logger.error("Unknown EventType: {}", event.getEventType());
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 3fabc1c..3d22986 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -21,7 +21,9 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
@@ -136,10 +138,11 @@ public class AsyncEvent implements Serializable {
this.applicationScope = applicationScope;
}
- @JsonSerialize
+ @JsonSerialize()
+ @JsonDeserialize(as=ReplicatedIndexLocationStrategy.class)
public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }
- public void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
+ protected void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
this.indexLocationStrategy = indexLocationStrategy;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
index b404a78..e1b6b9e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
@@ -19,6 +19,8 @@
*/
package org.apache.usergrid.corepersistence.index;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.index.IndexAlias;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
@@ -28,7 +30,7 @@ import org.apache.usergrid.persistence.index.IndexLocationStrategy;
*/
public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
- private ReplicatedIndexAlias alias;
+ private IndexAlias alias;
private String rootName;
private String indexInitialName;
private ApplicationScope applicationScope;
@@ -49,35 +51,65 @@ public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
}
@Override
+ @JsonSerialize()
+ @JsonDeserialize(as=ReplicatedIndexAlias.class)
public IndexAlias getAlias() {
return alias;
}
+ protected void setAlias(IndexAlias alias) {
+ this.alias = alias;
+ }
+
+
@Override
+ @JsonSerialize()
public String getIndexRootName() {
return rootName;
}
+ protected void setIndexRootName(String indexRootName) {
+ this.rootName = indexRootName;
+ }
@Override
+ @JsonSerialize()
public String getIndexInitialName() {
return indexInitialName;
}
+ protected void setIndexInitialName(String indexInitialName) {
+ this.indexInitialName = indexInitialName;
+ }
+
+
@Override
+ @JsonSerialize()
public ApplicationScope getApplicationScope() {
return applicationScope;
}
+ protected void setApplicationScope(ApplicationScope applicationScope) {
+ this.applicationScope = applicationScope;
+ }
+
+
@Override
+ @JsonSerialize()
public int getNumberOfShards() {
return numberShards;
}
-
+ public void setNumberOfShards(int shards) {
+ numberShards = shards;
+ }
@Override
+ @JsonSerialize()
public int getNumberOfReplicas() {
return numberReplicas;
}
+ public void setNumberOfReplicas(int replicas) {
+ numberReplicas = replicas;
+ }
public static class ReplicatedIndexAlias implements IndexAlias{
private String readAlias;
@@ -91,11 +123,13 @@ public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
this.writeAlias = alias.getWriteAlias();
}
@Override
+ @JsonSerialize()
public String getReadAlias() {
return readAlias;
}
@Override
+ @JsonSerialize()
public String getWriteAlias() {
return writeAlias;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 257f25e..1214a39 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -321,7 +321,8 @@ public class SNSQueueManagerImpl implements QueueManager {
Object body;
try {
- JsonNode bodyObj = mapper.readTree(message.getBody()).get("Message");
+ final JsonNode bodyNode = mapper.readTree(message.getBody());
+ JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
body = fromString(bodyObj.textValue(), klass);
} catch (Exception e) {
logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
@@ -371,7 +372,7 @@ public class SNSQueueManagerImpl implements QueueManager {
if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
- PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
+ PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
@Override
[11/14] incubator-usergrid git commit: change comment,
add replicated test
Posted by to...@apache.org.
change comment, add replicated test
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d342875c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d342875c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d342875c
Branch: refs/heads/observable-query-fix
Commit: d342875c4c910e49bf4cc48054e5fa99c0e88e70
Parents: 68cafb7
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 17:43:34 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 17:43:34 2015 -0600
----------------------------------------------------------------------
.../index/ReplicatedIndexLocationStrategy.java | 2 +-
.../corepersistence/index/IndexNamingTest.java | 18 +++++++++++++++++-
2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d342875c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
index e1b6b9e..4dd237a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
@@ -26,7 +26,7 @@ import org.apache.usergrid.persistence.index.IndexAlias;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
/**
- * Classy class class.
+ * Strategy to replicate an index naming convention and publish elsewhere
*/
public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d342875c/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
index 2d77697..95ad17e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
import com.google.inject.Inject;
+import junit.framework.Assert;
import net.jcip.annotations.NotThreadSafe;
import org.apache.usergrid.corepersistence.TestIndexModule;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -173,7 +174,22 @@ public class IndexNamingTest {
names.add(name);
}
//always hashes to diff't bucket
- assertTrue(names.size()==indexProcessorFig.getNumberOfIndexBuckets());
+ assertTrue(names.size() == indexProcessorFig.getNumberOfIndexBuckets());
+ }
+
+ @Test
+ public void testReplication(){
+ IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+ ReplicatedIndexLocationStrategy replicatedIndexLocationStrategy = new ReplicatedIndexLocationStrategy(indexLocationStrategy);
+ assertEquals(replicatedIndexLocationStrategy.getApplicationScope(),indexLocationStrategy.getApplicationScope());
+ assertEquals(replicatedIndexLocationStrategy.getIndexInitialName(),indexLocationStrategy.getIndexInitialName());
+ assertEquals(replicatedIndexLocationStrategy.getIndexRootName(),indexLocationStrategy.getIndexRootName());
+ assertEquals(replicatedIndexLocationStrategy.getNumberOfReplicas(), indexLocationStrategy.getNumberOfReplicas());
+ assertEquals(replicatedIndexLocationStrategy.getNumberOfShards(),indexLocationStrategy.getNumberOfShards());
+ assertEquals(replicatedIndexLocationStrategy.getAlias().getReadAlias(),indexLocationStrategy.getAlias().getReadAlias());
+ assertEquals(replicatedIndexLocationStrategy.getAlias().getWriteAlias(),indexLocationStrategy.getAlias().getWriteAlias());
+
+
}
}
[02/14] incubator-usergrid git commit: Update set data migration
version resource to receive integers and echo back the post-update versions.
Posted by to...@apache.org.
Update set data migration version resource to receive integers and echo back the post-update versions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7fbb9f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7fbb9f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7fbb9f54
Branch: refs/heads/observable-query-fix
Commit: 7fbb9f54823f84f34cf3ae92a7cea3f64ac650ec
Parents: 48689eb
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Jul 10 15:09:47 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Jul 10 15:09:47 2015 -0700
----------------------------------------------------------------------
.../apache/usergrid/rest/MigrateResource.java | 26 +++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7fbb9f54/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
index ed0604a..da0ba0f 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
@@ -130,23 +130,37 @@ public class MigrateResource extends AbstractContextResource {
Preconditions.checkNotNull( json, "You must provide a json body" );
Preconditions.checkArgument( json.keySet().size() > 0, "You must specify at least one module and version" );
+ ApiResponse response = createApiResponse();
+ response.setAction("Set Migration Versions");
+
+ ObjectNode node = JsonNodeFactory.instance.objectNode();
+
+ final DataMigrationManager dataMigrationManager = getDataMigrationManager();
+ final Set<String> plugins = dataMigrationManager.getPluginNames();
+
/**
* Set the migration version for the plugins specified
*/
for ( final String key : json.keySet() ) {
- String version = ( String ) json.get( key );
- Preconditions.checkArgument( version != null && version.length() > 0,
- "You must specify a version field per module name" );
+ int version = ( int ) json.get( key );
+ dataMigrationManager.resetToVersion(key, version);
+ }
- int intVersion = Integer.parseInt( version );
- getDataMigrationManager().resetToVersion( key, intVersion );
+ /**
+ * Echo back a response of the current versions for all plugins
+ */
+ for(final String pluginName: plugins){
+ node.put(pluginName, dataMigrationManager.getCurrentVersion(pluginName));
}
- return migrateStatus( ui, callback );
+ response.setData( node );
+ response.setSuccess();
+
+ return new JSONWithPadding( response, callback );
}
[13/14] incubator-usergrid git commit: Merge branch 'pr/304' into
USERGRID-840
Posted by to...@apache.org.
Merge branch 'pr/304' into USERGRID-840
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a9c9581b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a9c9581b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a9c9581b
Branch: refs/heads/observable-query-fix
Commit: a9c9581b59b666a5319ad0bb040c02fd18d610df
Parents: d342875 a721558
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 18:19:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 18:19:48 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 3 +-
.../usergrid/persistence/queue/QueueScope.java | 14 +++
.../persistence/queue/impl/QueueScopeImpl.java | 7 +-
.../queue/impl/SNSQueueManagerImpl.java | 96 ++++++++++++++------
.../queue/util/AmazonNotificationUtils.java | 42 +++++++++
.../persistence/queue/QueueManagerTest.java | 2 +-
.../notifications/NotificationsService.java | 2 +-
.../services/notifications/QueueListener.java | 2 +-
.../usergrid/services/queues/QueueListener.java | 2 +-
9 files changed, 134 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a9c9581b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 70eb361,bd97d66..11ca2d3
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@@ -106,10 -104,9 +107,10 @@@ public class AmazonAsyncEventService im
this.indexService = indexService;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.rxTaskScheduler = rxTaskScheduler;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.entityIndexFactory = entityIndexFactory;
- final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+ this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
this.queue = queueManagerFactory.getQueueManager(queueScope);
this.indexProcessorFig = indexProcessorFig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a9c9581b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 1214a39,60138ee..f41d238
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@@ -372,9 -407,9 +408,9 @@@ public class SNSQueueManagerImpl implem
if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
- PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
+ PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
- sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
@Override
public void onError(Exception e) {
logger.error("Error publishing message... {}", e);
[03/14] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
into pr/303
Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' into pr/303
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1adcfd29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1adcfd29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1adcfd29
Branch: refs/heads/observable-query-fix
Commit: 1adcfd292ad42fdc65ace103fb8f3b54e51f2cb4
Parents: 7fbb9f5 a842d57
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:19:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:19:19 2015 -0600
----------------------------------------------------------------------
.../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
.../persistence/core/rx/OrderedMergeTest.java | 35 ++++--
2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
[10/14] incubator-usergrid git commit: Add support for having single
region and all region AWS queue implementations.
Posted by to...@apache.org.
Add support for having single region and all region AWS queue implementations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c467a385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c467a385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c467a385
Branch: refs/heads/observable-query-fix
Commit: c467a3858ed859443666d6f497832bae8d1652f9
Parents: ad3916f
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Jul 13 16:31:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Jul 13 16:31:18 2015 -0700
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 3 ++-
.../apache/usergrid/persistence/queue/QueueScope.java | 14 ++++++++++++++
.../persistence/queue/impl/QueueScopeImpl.java | 7 ++++++-
.../persistence/queue/impl/SNSQueueManagerImpl.java | 4 ++--
.../usergrid/persistence/queue/QueueManagerTest.java | 2 +-
.../services/notifications/NotificationsService.java | 2 +-
.../services/notifications/QueueListener.java | 2 +-
.../usergrid/services/queues/QueueListener.java | 2 +-
8 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c5b836b..bd97d66 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -73,6 +73,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
private static final String QUEUE_NAME = "es_queue";
private final QueueManager queue;
+ private final QueueScope queueScope;
private final IndexProcessorFig indexProcessorFig;
private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -105,7 +106,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.rxTaskScheduler = rxTaskScheduler;
- final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+ this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
this.queue = queueManagerFactory.getQueueManager(queueScope);
this.indexProcessorFig = indexProcessorFig;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
index cf6bf24..4beacf6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
@@ -24,8 +24,22 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
public interface QueueScope {
/**
+ * LOCALREGION will create a SNS topic with a queue subscription in a single AWS region.
+ * ALLREGIONS will create SNS topics and queue subscriptions in ALL AWS regions.
+ */
+ enum RegionImplementation {
+ LOCALREGION,
+ ALLREGIONS
+ }
+
+ /**
* Get the name of the the map
* @return
*/
public String getName();
+
+ /**
+ * Get the Usergrid region enum
+ */
+ public RegionImplementation getRegionImplementation();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
index 381cd8e..09a0bcd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
@@ -26,9 +26,11 @@ import org.apache.usergrid.persistence.queue.QueueScope;
public class QueueScopeImpl implements QueueScope {
private final String name;
+ private final RegionImplementation regionImpl;
- public QueueScopeImpl( final String name ) {
+ public QueueScopeImpl( final String name, final RegionImplementation regionImpl) {
this.name = name;
+ this.regionImpl = regionImpl;
}
@@ -40,6 +42,9 @@ public class QueueScopeImpl implements QueueScope {
}
@Override
+ public RegionImplementation getRegionImplementation() {return regionImpl;}
+
+ @Override
public boolean equals( final Object o ) {
if ( this == o ) {
return true;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index c5a0f30..60138ee 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -158,7 +158,7 @@ public class SNSQueueManagerImpl implements QueueManager {
logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
}
- if (fig.isMultiRegion()) {
+ if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALLREGIONS) {
String multiRegion = fig.getRegionList();
@@ -299,7 +299,7 @@ public class SNSQueueManagerImpl implements QueueManager {
private String getName() {
- String name = clusterFig.getClusterName() + "_" + scope.getName();
+ String name = clusterFig.getClusterName() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 33fa1f5..0ed6065 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -65,7 +65,7 @@ public class QueueManagerTest {
@Before
public void mockApp() {
- this.scope = new QueueScopeImpl( "testQueue" );
+ this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCALREGION );
qm = qmf.getQueueManager(scope);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 76f10c9..130756d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -108,7 +108,7 @@ public class NotificationsService extends AbstractCollectionService {
postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests");
JobScheduler jobScheduler = new JobScheduler(sm,em);
String name = ApplicationQueueManagerImpl.getQueueNames( props );
- QueueScope queueScope = new QueueScopeImpl( name );
+ QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCALREGION );
queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 8c765ac..eba0060 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -146,7 +146,7 @@ public class QueueListener {
com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");
svcMgr = smf.getServiceManager(smf.getManagementAppId());
LOG.info("getting from queue {} ", queueName);
- QueueScope queueScope = new QueueScopeImpl( queueName ) {};
+ QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION );
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
final AtomicLong runCount = new AtomicLong(0);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index d38db95..bd57bb3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -160,7 +160,7 @@ public abstract class QueueListener {
LOG.info("QueueListener: Starting execute process.");
svcMgr = smf.getServiceManager(smf.getManagementAppId());
LOG.info("getting from queue {} ", queueName);
- QueueScope queueScope = new QueueScopeImpl( queueName);
+ QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION);
QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
long runCount = 0;
[06/14] incubator-usergrid git commit: adding async events
Posted by to...@apache.org.
adding async events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cbea6e0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cbea6e0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cbea6e0b
Branch: refs/heads/observable-query-fix
Commit: cbea6e0bdf20d975e538b5ac6ec9b9b7110c0367
Parents: de1daae
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 11:15:24 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 11:15:24 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 19 +++++++----
.../asyncevents/AsyncEventService.java | 8 +++++
.../asyncevents/InMemoryAsyncEventService.java | 14 ++++++++
.../asyncevents/model/AsyncEvent.java | 17 ++++++++-
.../model/InitializeApplicationIndexEvent.java | 36 ++++++++++++++++++++
.../model/InitializeManagementIndexEvent.java | 32 +++++++++++++++++
.../index/ApplicationIndexLocationStrategy.java | 7 ----
.../index/IndexLocationStrategyFactoryImpl.java | 2 +-
8 files changed, 120 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c5b836b..e15824f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -30,6 +30,7 @@ import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.CpEntityManager;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,11 +77,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final IndexProcessorFig indexProcessorFig;
private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
- private final RxTaskScheduler rxTaskScheduler;
private final Timer readTimer;
private final Timer writeTimer;
- private final Timer messageProcessingTimer;
private final Object mutex = new Object();
@@ -98,12 +97,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexProcessorFig indexProcessorFig,
final MetricsFactory metricsFactory,
final IndexService indexService,
- final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final RxTaskScheduler rxTaskScheduler) {
+ final EntityCollectionManagerFactory entityCollectionManagerFactory
+ ) {
this.indexService = indexService;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
- this.rxTaskScheduler = rxTaskScheduler;
final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -111,7 +109,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
- this.messageProcessingTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.message_processing");
this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error");
this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle");
@@ -244,6 +241,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
+ public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
+ offer( new InitializeApplicationIndexEvent( applicationScope ) );
+ }
+
+ @Override
+ public void queueInitializeManagementIndex() {
+ offer( new InitializeManagementIndexEvent( ) );
+ }
+
+ @Override
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
final Entity entity) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 9fbed39..1c97c3e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
@@ -34,6 +35,13 @@ public interface AsyncEventService extends ReIndexAction {
/**
+ * Initialize index for creation
+ * @param applicationScope
+ */
+ void queueInitializeApplicationIndex( final ApplicationScope applicationScope );
+ void queueInitializeManagementIndex( );
+
+ /**
* Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory)
* We will return a distributed future. For SQS impls, this will return immediately, and the result will not be available.
* After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index b8e544d..2fc0bb2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,6 +20,8 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import com.amazonaws.services.opsworks.model.App;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +65,18 @@ public class InMemoryAsyncEventService implements AsyncEventService {
@Override
+ public void queueInitializeApplicationIndex(final ApplicationScope applicationScope) {
+ //index will be initialized locally, don't need to inform other indexes
+ return;
+ }
+
+ @Override
+ public void queueInitializeManagementIndex() {
+
+ }
+
+
+ @Override
public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) {
//process the entity immediately
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 66476f9..b331b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -61,6 +61,12 @@ public class AsyncEvent implements Serializable {
protected AsyncEvent() {
}
+ public AsyncEvent(final EventType eventType) {
+
+ this.eventType = eventType;
+ this.creationTime = System.currentTimeMillis();
+ }
+
public AsyncEvent(final EventType eventType,
final EntityIdScope entityIdScope) {
@@ -69,6 +75,12 @@ public class AsyncEvent implements Serializable {
this.creationTime = System.currentTimeMillis();
}
+ public AsyncEvent(EventType eventType, ApplicationScope applicationScope) {
+ this.eventType = eventType;
+ this.applicationScope = applicationScope;
+ this.creationTime = System.currentTimeMillis();
+ }
+
public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Edge edge) {
this.eventType = eventType;
this.applicationScope = applicationScope;
@@ -136,7 +148,10 @@ public class AsyncEvent implements Serializable {
EDGE_DELETE,
EDGE_INDEX,
ENTITY_DELETE,
- ENTITY_INDEX;
+ ENTITY_INDEX,
+ APPLICATION_INDEX,
+ MANAGEMENT_INDEX;
+ ;
public String asString() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
new file mode 100644
index 0000000..6612e8b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+/**
+ * event to init app index
+ */
+@JsonDeserialize(as = AsyncEvent.class)
+public class InitializeApplicationIndexEvent extends AsyncEvent {
+ public InitializeApplicationIndexEvent() {
+ }
+ public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
+ super(EventType.APPLICATION_INDEX, applicationScope);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
new file mode 100644
index 0000000..0af249d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+/**
+ * Event to initialize mgmt index
+ */
+@JsonDeserialize(as = AsyncEvent.class)
+public class InitializeManagementIndexEvent extends AsyncEvent{
+ public InitializeManagementIndexEvent(){
+ super(EventType.MANAGEMENT_INDEX);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
index fcfb09b..c105119 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
@@ -31,11 +31,8 @@ import org.apache.usergrid.utils.StringUtils;
* Strategy for getting the application index name.
*/
class ApplicationIndexLocationStrategy implements IndexLocationStrategy {
- private final ClusterFig clusterFig;
- private final CassandraFig cassandraFig;
private final IndexFig indexFig;
private final ApplicationScope applicationScope;
- private final ApplicationIndexBucketLocator applicationIndexBucketLocator;
private final String indexBucketName;
private final IndexAlias alias;
private final String indexRootName;
@@ -45,12 +42,8 @@ class ApplicationIndexLocationStrategy implements IndexLocationStrategy {
final IndexFig indexFig,
final ApplicationScope applicationScope,
final ApplicationIndexBucketLocator applicationIndexBucketLocator){
- this.clusterFig = clusterFig;
-
- this.cassandraFig = cassandraFig;
this.indexFig = indexFig;
this.applicationScope = applicationScope;
- this.applicationIndexBucketLocator = applicationIndexBucketLocator;
this.indexRootName = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace().toLowerCase();
this.alias = new ApplicationIndexAlias(indexFig, applicationScope, indexRootName);
this.indexBucketName = indexRootName + "_applications_" + applicationIndexBucketLocator.getBucket(applicationScope);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
index 2d71e41..6a99890 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
@@ -47,7 +47,7 @@ public class IndexLocationStrategyFactoryImpl implements IndexLocationStrategyFa
this.applicationLocatorBucketStrategy = applicationLocatorBucketStrategy;
this.coreIndexFig = coreIndexFig;
}
- public IndexLocationStrategy getIndexLocationStrategy(ApplicationScope applicationScope){
+ public IndexLocationStrategy getIndexLocationStrategy( final ApplicationScope applicationScope){
if(CpNamingUtils.getManagementApplicationId().equals(applicationScope.getApplication())){
return new ManagementIndexLocationStrategy(clusterFig,cassandraFig,indexFig,coreIndexFig);
}
[04/14] incubator-usergrid git commit: Revert "adding offer instead
of put"
Posted by to...@apache.org.
Revert "adding offer instead of put"
This reverts commit a842d5700e9d374fc091a3085ddf5e7f30bf806a.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/de1daaeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/de1daaeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/de1daaeb
Branch: refs/heads/observable-query-fix
Commit: de1daaeb19c438f458cb608cfa9ede4ede5f8f97
Parents: 1adcfd2
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:44:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:44:10 2015 -0600
----------------------------------------------------------------------
.../rx/ObservableToBlockingIteratorFactory.java | 107 +++++++------------
.../persistence/core/rx/OrderedMergeTest.java | 35 ++----
2 files changed, 46 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index ec5056e..9807749 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,8 +31,6 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
/**
* Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -56,7 +54,7 @@ public final class ObservableToBlockingIteratorFactory {
* @return the iterator that could be used to iterate over the elements of the observable.
*/
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
- final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
+ final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -67,88 +65,61 @@ public final class ObservableToBlockingIteratorFactory {
@Override
public void onError(Throwable e) {
- boolean offerFinished = false;
try{
- do {
- offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
- }while (!offerFinished && !this.isUnsubscribed());
- }catch (InterruptedException t){
+ notifications.put(Notification.<T>createOnError(e));
+ }catch (Exception t){
}
}
@Override
public void onNext(Notification<? extends T> args) {
- boolean offerFinished = false;
+ try{
+ notifications.put(args);
+ }catch (Exception t){
- try {
- do {
- offerFinished = notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
- } while (!offerFinished && !this.isUnsubscribed());
+ }
+ }
+ });
- } catch (InterruptedException t) {
+ return new Iterator<T>() {
+ private Notification<? extends T> buf;
+ @Override
+ public boolean hasNext() {
+ if (buf == null) {
+ buf = take();
}
+ if (buf.isOnError()) {
+ throw Exceptions.propagate(buf.getThrowable());
+ }
+ return !buf.isOnCompleted();
}
@Override
- protected void finalize() throws Throwable {
- super.finalize();
+ public T next() {
+ if (hasNext()) {
+ T result = buf.getValue();
+ buf = null;
+ return result;
+ }
+ throw new NoSuchElementException();
}
- });
-
- return new ObservableBlockingIterator<T>(notifications,subscription);
- }
-
- private static class ObservableBlockingIterator<T> implements Iterator<T> {
- private final BlockingQueue<Notification<? extends T>> notifications;
- private final Subscription subscription;
- public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
- this.notifications = notifications;
- this.subscription = subscription;
- }
-
- private Notification<? extends T> buf;
-
- @Override
- public boolean hasNext() {
- if (buf == null) {
- buf = take();
- }
- if (buf.isOnError()) {
- throw Exceptions.propagate(buf.getThrowable());
- }
- return !buf.isOnCompleted();
- }
-
- @Override
- public T next() {
- if (hasNext()) {
- T result = buf.getValue();
- buf = null;
- return result;
- }
- throw new NoSuchElementException();
- }
-
- private Notification<? extends T> take() {
- try {
- return notifications.take();
- } catch (InterruptedException e) {
- subscription.unsubscribe();
- throw Exceptions.propagate(e);
+ private Notification<? extends T> take() {
+ try {
+ return notifications.take();
+ } catch (InterruptedException e) {
+ subscription.unsubscribe();
+ throw Exceptions.propagate(e);
+ }
}
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException("Read-only iterator");
- }
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- }
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Read-only iterator");
+ }
+ };
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index a81ef8f..649ac7a 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
//pull from source
for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
//emit
- log.info("inner produce " + count);
+ log.info("loop " + count);
subscriber.onNext(count++);
}
}
@@ -559,34 +559,13 @@ public class OrderedMergeTest {
log.info("iteration " + o);
}).subscribeOn(Schedulers.io()));
//never
- for(int i =0; i<20;i++){
- Object it =iterator.next();
- log.info("iterate "+i);
- }
-
- iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
- int count = 0;
- while (!subscriber.isUnsubscribed()) {
- //pull from source
- for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
- //emit
- log.info("inner produce " + count);
- subscriber.onNext(count++);
- }
- }
+ Object it =iterator.next();
+ it = iterator.next();
+ log.info("iterate");
+ it = iterator.next();
+ log.info("iterate");
- subscriber.onCompleted();
- })
- .onBackpressureBlock(1)
- .buffer(2)
- .doOnNext(o -> {
- log.info("iteration " + o);
- }).subscribeOn(Schedulers.io()));
- //never
- for(int i =0; i<20;i++){
- Object it =iterator.next();
- log.info("iterate "+i);
- }
+ Object size = it;
}
[14/14] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
observable-query-fix
Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into observable-query-fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7256ea73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7256ea73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7256ea73
Branch: refs/heads/observable-query-fix
Commit: 7256ea731520ebf42e0c343a907410af35776411
Parents: 208332d a9c9581
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 14 11:09:57 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 14 11:09:57 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 4 +-
.../asyncevents/AmazonAsyncEventService.java | 46 +++++--
.../asyncevents/AsyncEventService.java | 8 ++
.../asyncevents/AsyncIndexProvider.java | 14 +-
.../asyncevents/InMemoryAsyncEventService.java | 8 ++
.../asyncevents/model/AsyncEvent.java | 29 +++-
.../model/InitializeApplicationIndexEvent.java | 38 +++++
.../index/ApplicationIndexLocationStrategy.java | 7 -
.../index/IndexLocationStrategyFactoryImpl.java | 2 +-
.../index/ReplicatedIndexLocationStrategy.java | 137 +++++++++++++++++++
.../index/AmazonAsyncEventServiceTest.java | 10 +-
.../corepersistence/index/IndexNamingTest.java | 18 ++-
.../usergrid/persistence/queue/QueueScope.java | 14 ++
.../persistence/queue/impl/QueueScopeImpl.java | 7 +-
.../queue/impl/SNSQueueManagerImpl.java | 101 +++++++++-----
.../queue/util/AmazonNotificationUtils.java | 42 ++++++
.../persistence/queue/QueueManagerTest.java | 2 +-
.../apache/usergrid/rest/MigrateResource.java | 26 +++-
.../notifications/NotificationsService.java | 2 +-
.../services/notifications/QueueListener.java | 2 +-
.../usergrid/services/queues/QueueListener.java | 2 +-
21 files changed, 452 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7256ea73/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
[12/14] incubator-usergrid git commit: Add exception to failed policy
log statement.
Posted by to...@apache.org.
Add exception to failed policy log statement.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a7215580
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a7215580
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a7215580
Branch: refs/heads/observable-query-fix
Commit: a721558092fe9a04d9f96cd4ac020e6b430cc359
Parents: c467a38
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Jul 13 17:03:13 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Jul 13 17:03:13 2015 -0700
----------------------------------------------------------------------
.../usergrid/persistence/queue/util/AmazonNotificationUtils.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7215580/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 52a4925..1d86823 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -96,7 +96,7 @@ public class AmazonNotificationUtils {
try {
sqs.setQueueAttributes(queueAttributesRequest);
}catch (Exception e){
- logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString());
+ logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString(), e);
}
[05/14] incubator-usergrid git commit: Ensure all queues are
subscribed to the multi-region SNS topics with appropriate permissions.
Topic/queue creation covered back to synchronous SNS client,
leaving message send still using the async client.
Posted by to...@apache.org.
Ensure all queues are subscribed to the multi-region SNS topics with appropriate permissions. Topic/queue creation covered back to synchronous SNS client, leaving message send still using the async client.
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ad3916ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ad3916ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ad3916ff
Branch: refs/heads/observable-query-fix
Commit: ad3916ffc9ec2285f88ec62c4752065a8117147c
Parents: 48689eb
Author: Michael Russo <mi...@gmail.com>
Authored: Sat Jul 11 17:25:20 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Sun Jul 12 16:35:54 2015 -0700
----------------------------------------------------------------------
.../queue/impl/SNSQueueManagerImpl.java | 92 ++++++++++++++------
.../queue/util/AmazonNotificationUtils.java | 42 +++++++++
2 files changed, 106 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ad3916ff/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 257f25e..c5a0f30 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -23,8 +23,8 @@ import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.sns.AmazonSNSAsyncClient;
+import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.*;
-import com.amazonaws.services.sns.util.Topics;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.*;
import com.fasterxml.jackson.core.JsonFactory;
@@ -55,7 +55,8 @@ public class SNSQueueManagerImpl implements QueueManager {
private final QueueFig fig;
private final ClusterFig clusterFig;
private final AmazonSQSClient sqs;
- private final AmazonSNSAsyncClient sns;
+ private final AmazonSNSClient sns;
+ private final AmazonSNSAsyncClient snsAsync;
private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -112,6 +113,7 @@ public class SNSQueueManagerImpl implements QueueManager {
try {
sqs = createSQSClient(getRegion());
sns = createSNSClient(getRegion());
+ snsAsync = createAsyncSNSClient(getRegion());
} catch (Exception e) {
throw new RuntimeException("Error setting up mapper", e);
@@ -144,7 +146,14 @@ public class SNSQueueManagerImpl implements QueueManager {
}
try {
- Topics.subscribeQueue(sns, sqs, primaryTopicArn, queueUrl);
+
+ SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
+ sns.subscribe(primarySubscribeRequest);
+
+ // ensure the SNS primary topic has permission to send to the primary SQS queue
+ List<String> primaryTopicArnList = new ArrayList<>();
+ primaryTopicArnList.add(primaryTopicArn);
+ AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList);
} catch (AmazonServiceException e) {
logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
}
@@ -167,19 +176,18 @@ public class SNSQueueManagerImpl implements QueueManager {
for (String regionName : regionNames) {
regionName = regionName.trim();
-
Regions regions = Regions.fromName(regionName);
Region region = Region.getRegion(regions);
- final AmazonSQSClient sqsClient = createSQSClient(region);
- final AmazonSNSAsyncClient snsClient = createSNSClient(region);
+ AmazonSQSClient sqsClient = createSQSClient(region);
+ AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
+ // getTopicArn will create the SNS topic if it doesn't exist
String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
-
topicArns.put(topicArn, regionName);
+ // create the SQS queue if it doesn't exist
String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
-
if (queueArn == null) {
queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
@@ -197,36 +205,41 @@ public class SNSQueueManagerImpl implements QueueManager {
Regions sqsRegions = Regions.fromName(strSqsRegion);
Region sqsRegion = Region.getRegion(sqsRegions);
- final AmazonSQSClient sqsClient = createSQSClient(sqsRegion);
+ AmazonSQSClient subscribeSqsClient = createSQSClient(sqsRegion);
- logger.info("Creating subscriptions for QUEUE ARN=[{}]", queueARN);
+ // ensure the URL used to subscribe is for the correct name/region
+ String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName(subscribeSqsClient, queueName);
- for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
- String topicARN = topicArnEntry.getKey();
+ // this list used later for adding permissions to queues
+ List<String> topicArnList = new ArrayList<>();
- logger.info("Creating subscriptions for TOPIC ARN=[{}]", topicArnEntry.getKey());
+ for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
- String strSnsRegion = queueArnEntry.getValue();
+ String topicARN = topicArnEntry.getKey();
+ topicArnList.add(topicARN);
+ String strSnsRegion = topicArnEntry.getValue();
Regions snsRegions = Regions.fromName(strSnsRegion);
Region snsRegion = Region.getRegion(snsRegions);
- final AmazonSNSAsyncClient snsClient = createSNSClient(snsRegion);
+ AmazonSNSClient subscribeSnsClient = createSNSClient(snsRegion); // do this stuff synchronously
+ SubscribeRequest subscribeRequest = new SubscribeRequest(topicARN, "sqs", queueARN);
try {
logger.info("Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
- queueARN,
- strSqsRegion,
- topicARN,
- strSnsRegion
+ queueARN,
+ strSqsRegion,
+ topicARN,
+ strSnsRegion
);
- Topics.subscribeQueue(
- snsClient,
- sqsClient,
- topicArnEntry.getKey(),
- queueArnEntry.getKey());
+ SubscribeResult subscribeResult = subscribeSnsClient.subscribe(subscribeRequest);
+ String subscriptionARN = subscribeResult.getSubscriptionArn();
+ if(logger.isDebugEnabled()){
+ logger.debug("Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", queueARN, topicARN, subscriptionARN);
+ }
+
} catch (Exception e) {
logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
@@ -238,14 +251,23 @@ public class SNSQueueManagerImpl implements QueueManager {
}
}
+
+ logger.info("Adding permission to receive messages...");
+ // add permission to each queue, providing a list of topics that it's subscribed to
+ AmazonNotificationUtils.setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList);
+
}
}
return primaryTopicArn;
}
+ /**
+ * The Asynchronous SNS client is used for publishing events to SNS.
+ *
+ */
- private AmazonSNSAsyncClient createSNSClient(final Region region) {
+ private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
/**
@@ -261,6 +283,20 @@ public class SNSQueueManagerImpl implements QueueManager {
return sns;
}
+ /**
+ * The Synchronous SNS client is used for creating topics and subscribing queues.
+ *
+ */
+ private AmazonSNSClient createSNSClient(final Region region) {
+ final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+ final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+
+ sns.setRegion(region);
+
+ return sns;
+ }
+
private String getName() {
String name = clusterFig.getClusterName() + "_" + scope.getName();
@@ -346,7 +382,7 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public void sendMessages(final List bodies) throws IOException {
- if (sns == null) {
+ if (snsAsync == null) {
logger.error("SNS client is null, perhaps it failed to initialize successfully");
return;
}
@@ -360,7 +396,7 @@ public class SNSQueueManagerImpl implements QueueManager {
@Override
public void sendMessage(final Object body) throws IOException {
- if (sns == null) {
+ if (snsAsync == null) {
logger.error("SNS client is null, perhaps it failed to initialize successfully");
return;
}
@@ -373,7 +409,7 @@ public class SNSQueueManagerImpl implements QueueManager {
PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
- sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+ snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
@Override
public void onError(Exception e) {
logger.error("Error publishing message... {}", e);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ad3916ff/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index f7b2e06..52a4925 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,6 +1,9 @@
package org.apache.usergrid.persistence.queue.util;
import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.policy.*;
+import com.amazonaws.auth.policy.actions.SQSActions;
+import com.amazonaws.auth.policy.conditions.ConditionFactory;
import com.amazonaws.services.sns.AmazonSNSClient;
import com.amazonaws.services.sns.model.*;
import com.amazonaws.services.sns.util.Topics;
@@ -11,7 +14,9 @@ import org.apache.usergrid.persistence.queue.QueueFig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/**
@@ -60,6 +65,43 @@ public class AmazonNotificationUtils {
return url;
}
+ public static void setQueuePermissionsToReceive(final AmazonSQSClient sqs,
+ final String queueUrl,
+ final List<String> topicARNs) throws Exception{
+
+ String queueARN = getQueueArnByUrl(sqs, queueUrl);
+
+ Statement statement = new Statement(Statement.Effect.Allow)
+ .withActions(SQSActions.SendMessage)
+ .withPrincipals(new Principal("*"))
+ .withResources(new Resource(queueARN));
+
+ List<Condition> conditions = new ArrayList<>();
+
+ for(String topicARN : topicARNs){
+
+ conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+
+ }
+ statement.setConditions(conditions);
+
+ Policy policy = new Policy("SubscriptionPermission").withStatements(statement);
+
+
+ final Map<String, String> queueAttributes = new HashMap<>();
+ queueAttributes.put("Policy", policy.toJson());
+
+ SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest(queueUrl, queueAttributes);
+
+ try {
+ sqs.setQueueAttributes(queueAttributesRequest);
+ }catch (Exception e){
+ logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString());
+ }
+
+
+ }
+
public static String getQueueArnByName(final AmazonSQSClient sqs,
final String queueName)
[07/14] incubator-usergrid git commit: adding async events
Posted by to...@apache.org.
adding async events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a63c8175
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a63c8175
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a63c8175
Branch: refs/heads/observable-query-fix
Commit: a63c8175d362054f49ce25de53dbbbdf402e2141
Parents: cbea6e0
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 14:09:37 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 14:09:37 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 4 ++-
.../asyncevents/AmazonAsyncEventService.java | 35 ++++++++++++++++----
.../asyncevents/AsyncEventService.java | 1 -
.../asyncevents/AsyncIndexProvider.java | 14 ++++++--
.../asyncevents/InMemoryAsyncEventService.java | 6 ----
.../asyncevents/model/AsyncEvent.java | 4 +--
.../model/InitializeApplicationIndexEvent.java | 1 +
.../model/InitializeManagementIndexEvent.java | 32 ------------------
.../index/AmazonAsyncEventServiceTest.java | 10 +++++-
9 files changed, 54 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 048c558..f24891e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -154,6 +154,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
private void init() {
EntityManager em = getEntityManager(getManagementAppId());
+ indexService.queueInitializeApplicationIndex(CpNamingUtils.getApplicationScope(getManagementAppId()));
try {
if ( em.getApplication() == null ) {
@@ -252,9 +253,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
getSetup().setupApplicationKeyspace( applicationId, appName );
+ indexService.queueInitializeApplicationIndex(CpNamingUtils.getApplicationScope(applicationId));
if ( properties == null ) {
- properties = new TreeMap<>( CASE_INSENSITIVE_ORDER );
+ properties = new TreeMap<>( CASE_INSENSITIVE_ORDER);
}
properties.put( PROPERTY_NAME, appName );
EntityManager appEm = getEntityManager(applicationId);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e15824f..50000df 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,7 +29,10 @@ import com.codahale.metrics.Histogram;
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.CpEntityManager;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
@@ -77,6 +80,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
private final IndexProcessorFig indexProcessorFig;
private final IndexService indexService;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+ private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final EntityIndexFactory entityIndexFactory;
private final Timer readTimer;
private final Timer writeTimer;
@@ -97,11 +102,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
final IndexProcessorFig indexProcessorFig,
final MetricsFactory metricsFactory,
final IndexService indexService,
- final EntityCollectionManagerFactory entityCollectionManagerFactory
+ final EntityCollectionManagerFactory entityCollectionManagerFactory,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory
) {
this.indexService = indexService;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.entityIndexFactory = entityIndexFactory;
final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -229,6 +238,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
handleEntityIndexUpdate(message);
break;
+
+ case APPLICATION_INDEX:
+ handleInitializeApplicationIndex(message);
+
default:
logger.error("Unknown EventType: {}", event.getEventType());
@@ -242,13 +255,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
- offer( new InitializeApplicationIndexEvent( applicationScope ) );
+ offer(new InitializeApplicationIndexEvent(applicationScope));
}
- @Override
- public void queueInitializeManagementIndex() {
- offer( new InitializeManagementIndexEvent( ) );
- }
@Override
public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
@@ -363,6 +372,20 @@ public class AmazonAsyncEventService implements AsyncEventService {
}
+ public void handleInitializeApplicationIndex(final QueueMessage message) {
+ Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
+
+ final AsyncEvent event = (AsyncEvent) message.getBody();
+ Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
+ Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
+
+ final ApplicationScope applicationScope = event.getApplicationScope();
+ final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+ final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
+ index.initialize();
+ ack(message);
+ }
+
/**
* Loop through and start the workers
*/
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1c97c3e..a36a9ae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -39,7 +39,6 @@ public interface AsyncEventService extends ReIndexAction {
* @param applicationScope
*/
void queueInitializeApplicationIndex( final ApplicationScope applicationScope );
- void queueInitializeManagementIndex( );
/**
* Queue an entity to be indexed. This will start processing immediately. For implementations that are realtime (akka, in memory)
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index f455f9c..0a58369 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -20,11 +20,13 @@
package org.apache.usergrid.corepersistence.asyncevents;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.queue.QueueManagerFactory;
import com.google.inject.Inject;
@@ -46,6 +48,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
private final RxTaskScheduler rxTaskScheduler;
private final EntityCollectionManagerFactory entityCollectionManagerFactory;
private final EventBuilder eventBuilder;
+ private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+ private final EntityIndexFactory entityIndexFactory;
private AsyncEventService asyncEventService;
@@ -57,7 +61,9 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
final IndexService indexService,
final RxTaskScheduler rxTaskScheduler,
final EntityCollectionManagerFactory entityCollectionManagerFactory,
- final EventBuilder eventBuilder) {
+ final EventBuilder eventBuilder,
+ final IndexLocationStrategyFactory indexLocationStrategyFactory,
+ final EntityIndexFactory entityIndexFactory) {
this.indexProcessorFig = indexProcessorFig;
this.queueManagerFactory = queueManagerFactory;
@@ -66,6 +72,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
this.rxTaskScheduler = rxTaskScheduler;
this.entityCollectionManagerFactory = entityCollectionManagerFactory;
this.eventBuilder = eventBuilder;
+ this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+ this.entityIndexFactory = entityIndexFactory;
}
@@ -90,10 +98,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
case SQS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, rxTaskScheduler);
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
case SNS:
return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, rxTaskScheduler);
+ entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
default:
throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 2fc0bb2..adb4a90 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -71,12 +71,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
}
@Override
- public void queueInitializeManagementIndex() {
-
- }
-
-
- @Override
public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) {
//process the entity immediately
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index b331b6e..9278e0f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -149,9 +149,7 @@ public class AsyncEvent implements Serializable {
EDGE_INDEX,
ENTITY_DELETE,
ENTITY_INDEX,
- APPLICATION_INDEX,
- MANAGEMENT_INDEX;
- ;
+ APPLICATION_INDEX;
public String asString() {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 6612e8b..656d820 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@JsonDeserialize(as = AsyncEvent.class)
public class InitializeApplicationIndexEvent extends AsyncEvent {
public InitializeApplicationIndexEvent() {
+ super(EventType.APPLICATION_INDEX);
}
public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
super(EventType.APPLICATION_INDEX, applicationScope);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
deleted file mode 100644
index 0af249d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. The ASF licenses this file to You
- * * under the Apache License, Version 2.0 (the "License"); you may not
- * * use this file except in compliance with the License.
- * * You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License. For additional information regarding
- * * copyright in this work, please see the NOTICE file in the top level
- * * directory of this distribution.
- *
- */
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-
-/**
- * Event to initialize mgmt index
- */
-@JsonDeserialize(as = AsyncEvent.class)
-public class InitializeManagementIndexEvent extends AsyncEvent{
- public InitializeManagementIndexEvent(){
- super(EventType.MANAGEMENT_INDEX);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 02f42b7..9cf896c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -20,6 +20,7 @@
package org.apache.usergrid.corepersistence.index;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.junit.Rule;
import org.junit.runner.RunWith;
@@ -71,10 +72,17 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
public RxTaskScheduler rxTaskScheduler;
+ @Inject
+ public IndexLocationStrategyFactory indexLocationStrategyFactory;
+
+
+ @Inject
+ public EntityIndexFactory entityIndexFactory;
+
@Override
protected AsyncEventService getAsyncEventService() {
return new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
- entityCollectionManagerFactory, rxTaskScheduler );
+ entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory );
}
[08/14] incubator-usergrid git commit: adding async events
Posted by to...@apache.org.
adding async events
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/09873cec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/09873cec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/09873cec
Branch: refs/heads/observable-query-fix
Commit: 09873cec76ac5f11cef8e5cf39c870e823693619
Parents: a63c817
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 14:58:01 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 14:58:01 2015 -0600
----------------------------------------------------------------------
.../asyncevents/AmazonAsyncEventService.java | 11 +-
.../asyncevents/AsyncEventService.java | 1 +
.../asyncevents/model/AsyncEvent.java | 15 ++-
.../model/InitializeApplicationIndexEvent.java | 5 +-
.../index/ReplicatedIndexLocationStrategy.java | 103 +++++++++++++++++++
5 files changed, 124 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 50000df..1390058 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,7 +29,7 @@ import com.codahale.metrics.Histogram;
import com.google.common.base.Preconditions;
import org.apache.usergrid.corepersistence.CpEntityManager;
import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.*;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
@@ -38,9 +38,6 @@ import org.apache.usergrid.utils.UUIDUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -255,7 +252,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
@Override
public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
- offer(new InitializeApplicationIndexEvent(applicationScope));
+ IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+ offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
}
@@ -379,8 +377,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
- final ApplicationScope applicationScope = event.getApplicationScope();
- final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+ final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
index.initialize();
ack(message);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index a36a9ae..1a5e865 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import org.apache.usergrid.corepersistence.index.ReIndexAction;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.model.entity.Entity;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 9278e0f..3fabc1c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
import org.apache.usergrid.persistence.model.entity.Id;
import java.io.Serializable;
@@ -37,6 +38,9 @@ import java.io.Serializable;
public class AsyncEvent implements Serializable {
@JsonProperty
+ protected IndexLocationStrategy indexLocationStrategy;
+
+ @JsonProperty
protected EventType eventType;
@JsonProperty
@@ -75,9 +79,9 @@ public class AsyncEvent implements Serializable {
this.creationTime = System.currentTimeMillis();
}
- public AsyncEvent(EventType eventType, ApplicationScope applicationScope) {
+ public AsyncEvent(EventType eventType, IndexLocationStrategy indexLocationStrategy) {
this.eventType = eventType;
- this.applicationScope = applicationScope;
+ this.indexLocationStrategy = indexLocationStrategy;
this.creationTime = System.currentTimeMillis();
}
@@ -132,6 +136,13 @@ public class AsyncEvent implements Serializable {
this.applicationScope = applicationScope;
}
+ @JsonSerialize
+ public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }
+
+ public void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
+ this.indexLocationStrategy = indexLocationStrategy;
+ }
+
@JsonSerialize()
public Edge getEdge() {
return edge;
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 656d820..8b20651 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
/**
* event to init app index
@@ -30,8 +31,8 @@ public class InitializeApplicationIndexEvent extends AsyncEvent {
public InitializeApplicationIndexEvent() {
super(EventType.APPLICATION_INDEX);
}
- public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
- super(EventType.APPLICATION_INDEX, applicationScope);
+ public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) {
+ super(EventType.APPLICATION_INDEX, indexLocationStrategy);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
new file mode 100644
index 0000000..b404a78
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
@@ -0,0 +1,103 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. The ASF licenses this file to You
+ * * under the Apache License, Version 2.0 (the "License"); you may not
+ * * use this file except in compliance with the License.
+ * * You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License. For additional information regarding
+ * * copyright in this work, please see the NOTICE file in the top level
+ * * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.index;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexAlias;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+
+/**
+ * Classy class class.
+ */
+public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
+
+ private ReplicatedIndexAlias alias;
+ private String rootName;
+ private String indexInitialName;
+ private ApplicationScope applicationScope;
+ private int numberShards;
+ private int numberReplicas;
+
+ public ReplicatedIndexLocationStrategy(){
+
+ }
+
+ public ReplicatedIndexLocationStrategy(IndexLocationStrategy indexLocationStrategy){
+ alias = new ReplicatedIndexAlias( indexLocationStrategy.getAlias() );
+ rootName = indexLocationStrategy.getIndexRootName();
+ indexInitialName = indexLocationStrategy.getIndexInitialName();
+ applicationScope = indexLocationStrategy.getApplicationScope();
+ numberShards = indexLocationStrategy.getNumberOfShards();
+ numberReplicas = indexLocationStrategy.getNumberOfReplicas();
+ }
+
+ @Override
+ public IndexAlias getAlias() {
+ return alias;
+ }
+
+ @Override
+ public String getIndexRootName() {
+ return rootName;
+ }
+
+ @Override
+ public String getIndexInitialName() {
+ return indexInitialName;
+ }
+
+ @Override
+ public ApplicationScope getApplicationScope() {
+ return applicationScope;
+ }
+
+ @Override
+ public int getNumberOfShards() {
+ return numberShards;
+ }
+
+ @Override
+ public int getNumberOfReplicas() {
+ return numberReplicas;
+ }
+
+ public static class ReplicatedIndexAlias implements IndexAlias{
+
+ private String readAlias;
+ private String writeAlias;
+
+ public ReplicatedIndexAlias(){
+
+ }
+ public ReplicatedIndexAlias(IndexAlias alias){
+ this.readAlias = alias.getReadAlias();
+ this.writeAlias = alias.getWriteAlias();
+ }
+ @Override
+ public String getReadAlias() {
+ return readAlias;
+ }
+
+ @Override
+ public String getWriteAlias() {
+ return writeAlias;
+ }
+ }
+}