You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/07/14 19:10:06 UTC

[01/14] incubator-usergrid git commit: adding offer instead of put

Repository: incubator-usergrid
Updated Branches:
  refs/heads/observable-query-fix 208332da9 -> 7256ea731


adding offer instead of put


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a842d570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a842d570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a842d570

Branch: refs/heads/observable-query-fix
Commit: a842d5700e9d374fc091a3085ddf5e7f30bf806a
Parents: 48689eb
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 10:33:43 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 10:33:43 2015 -0600

----------------------------------------------------------------------
 .../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
 .../persistence/core/rx/OrderedMergeTest.java   |  35 ++++--
 2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index 9807749..ec5056e 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -54,7 +56,7 @@ public final class ObservableToBlockingIteratorFactory {
      * @return the iterator that could be used to iterate over the elements of the observable.
      */
     public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
-        final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
+        final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
 
         // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
         final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -65,61 +67,88 @@ public final class ObservableToBlockingIteratorFactory {
 
             @Override
             public void onError(Throwable e) {
+                boolean offerFinished = false;
                 try{
-                    notifications.put(Notification.<T>createOnError(e));
-                }catch (Exception t){
+                    do {
+                        offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
+                    }while (!offerFinished && !this.isUnsubscribed());
+                }catch (InterruptedException t){
 
                 }
             }
 
             @Override
             public void onNext(Notification<? extends T> args) {
-                try{
-                    notifications.put(args);
-                }catch (Exception t){
+                boolean offerFinished = false;
 
-                }
-            }
-        });
+                try {
+                    do {
+                        offerFinished =  notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
+                    } while (!offerFinished && !this.isUnsubscribed());
 
-        return new Iterator<T>() {
-            private Notification<? extends T> buf;
+                } catch (InterruptedException t) {
 
-            @Override
-            public boolean hasNext() {
-                if (buf == null) {
-                    buf = take();
                 }
-                if (buf.isOnError()) {
-                    throw Exceptions.propagate(buf.getThrowable());
-                }
-                return !buf.isOnCompleted();
             }
 
             @Override
-            public T next() {
-                if (hasNext()) {
-                    T result = buf.getValue();
-                    buf = null;
-                    return result;
-                }
-                throw new NoSuchElementException();
+            protected void finalize() throws Throwable {
+                super.finalize();
             }
+        });
 
-            private Notification<? extends T> take() {
-                try {
-                    return notifications.take();
-                } catch (InterruptedException e) {
-                    subscription.unsubscribe();
-                    throw Exceptions.propagate(e);
-                }
-            }
+        return new ObservableBlockingIterator<T>(notifications,subscription);
+    }
 
-            @Override
-            public void remove() {
-                throw new UnsupportedOperationException("Read-only iterator");
+    private static class ObservableBlockingIterator<T> implements Iterator<T> {
+        private final BlockingQueue<Notification<? extends T>> notifications;
+        private final Subscription subscription;
+
+        public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
+            this.notifications = notifications;
+            this.subscription = subscription;
+        }
+
+        private Notification<? extends T> buf;
+
+        @Override
+        public boolean hasNext() {
+            if (buf == null) {
+                buf = take();
             }
-        };
-    }
+            if (buf.isOnError()) {
+                throw Exceptions.propagate(buf.getThrowable());
+            }
+            return !buf.isOnCompleted();
+        }
+
+        @Override
+        public T next() {
+            if (hasNext()) {
+                T result = buf.getValue();
+                buf = null;
+                return result;
+            }
+            throw new NoSuchElementException();
+        }
+
+        private Notification<? extends T> take() {
+            try {
+                return notifications.take();
+            } catch (InterruptedException e) {
+                subscription.unsubscribe();
+                throw Exceptions.propagate(e);
+            }
+        }
 
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Read-only iterator");
+        }
+
+        @Override
+        protected void finalize() throws Throwable {
+            super.finalize();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a842d570/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index 649ac7a..a81ef8f 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
                 //pull from source
                 for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
                     //emit
-                    log.info("loop " + count);
+                    log.info("inner produce " + count);
                     subscriber.onNext(count++);
                 }
             }
@@ -559,13 +559,34 @@ public class OrderedMergeTest {
                 log.info("iteration " + o);
             }).subscribeOn(Schedulers.io()));
         //never
-        Object it =iterator.next();
-        it = iterator.next();
-        log.info("iterate");
-        it = iterator.next();
-        log.info("iterate");
+        for(int i =0; i<20;i++){
+            Object it =iterator.next();
+            log.info("iterate "+i);
+        }
+
+        iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
+            int count = 0;
+            while (!subscriber.isUnsubscribed()) {
+                //pull from source
+                for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
+                    //emit
+                    log.info("inner produce " + count);
+                    subscriber.onNext(count++);
+                }
+            }
 
-        Object size = it;
+            subscriber.onCompleted();
+        })
+            .onBackpressureBlock(1)
+            .buffer(2)
+            .doOnNext(o -> {
+                log.info("iteration " + o);
+            }).subscribeOn(Schedulers.io()));
+        //never
+        for(int i =0; i<20;i++){
+            Object it =iterator.next();
+            log.info("iterate "+i);
+        }
     }
 
 


[09/14] incubator-usergrid git commit: fix deserialization

Posted by to...@apache.org.
fix deserialization


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/68cafb7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/68cafb7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/68cafb7e

Branch: refs/heads/observable-query-fix
Commit: 68cafb7e878eb853f280b7374f53dfd499b0dcac
Parents: 09873ce
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 16:57:51 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 16:57:51 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  2 +-
 .../asyncevents/model/AsyncEvent.java           |  7 ++--
 .../index/ReplicatedIndexLocationStrategy.java  | 38 ++++++++++++++++++--
 .../queue/impl/SNSQueueManagerImpl.java         |  5 +--
 4 files changed, 45 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 1390058..70eb361 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -235,9 +235,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         handleEntityIndexUpdate(message);
                         break;
 
-
                     case APPLICATION_INDEX:
                         handleInitializeApplicationIndex(message);
+                        break;
 
                     default:
                         logger.error("Unknown EventType: {}", event.getEventType());

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 3fabc1c..3d22986 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -21,7 +21,9 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
@@ -136,10 +138,11 @@ public class AsyncEvent implements Serializable {
         this.applicationScope = applicationScope;
     }
 
-    @JsonSerialize
+    @JsonSerialize()
+    @JsonDeserialize(as=ReplicatedIndexLocationStrategy.class)
     public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }
 
-    public void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
+    protected void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
         this.indexLocationStrategy = indexLocationStrategy;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
index b404a78..e1b6b9e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
@@ -19,6 +19,8 @@
  */
 package org.apache.usergrid.corepersistence.index;
 
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.index.IndexAlias;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
@@ -28,7 +30,7 @@ import org.apache.usergrid.persistence.index.IndexLocationStrategy;
  */
 public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
 
-    private ReplicatedIndexAlias alias;
+    private IndexAlias alias;
     private String rootName;
     private String indexInitialName;
     private ApplicationScope applicationScope;
@@ -49,35 +51,65 @@ public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
     }
 
     @Override
+    @JsonSerialize()
+    @JsonDeserialize(as=ReplicatedIndexAlias.class)
     public IndexAlias getAlias() {
         return alias;
     }
 
+    protected void setAlias(IndexAlias alias) {
+        this.alias = alias;
+    }
+
+
     @Override
+    @JsonSerialize()
     public String getIndexRootName() {
         return rootName;
     }
+    protected void setIndexRootName(String indexRootName) {
+        this.rootName = indexRootName;
+    }
 
     @Override
+    @JsonSerialize()
     public String getIndexInitialName() {
         return indexInitialName;
     }
 
+    protected void setIndexInitialName(String indexInitialName) {
+        this.indexInitialName = indexInitialName;
+    }
+
+
     @Override
+    @JsonSerialize()
     public ApplicationScope getApplicationScope() {
         return applicationScope;
     }
 
+    protected void setApplicationScope(ApplicationScope applicationScope) {
+        this.applicationScope = applicationScope;
+    }
+
+
     @Override
+    @JsonSerialize()
     public int getNumberOfShards() {
         return numberShards;
     }
-
+    public void setNumberOfShards(int shards) {
+        numberShards = shards;
+    }
     @Override
+    @JsonSerialize()
     public int getNumberOfReplicas() {
         return numberReplicas;
     }
 
+    public void setNumberOfReplicas(int replicas) {
+        numberReplicas = replicas;
+    }
     public static class ReplicatedIndexAlias implements IndexAlias{
 
         private String readAlias;
@@ -91,11 +123,13 @@ public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
             this.writeAlias = alias.getWriteAlias();
         }
         @Override
+        @JsonSerialize()
         public String getReadAlias() {
             return readAlias;
         }
 
         @Override
+        @JsonSerialize()
         public String getWriteAlias() {
             return writeAlias;
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/68cafb7e/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 257f25e..1214a39 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -321,7 +321,8 @@ public class SNSQueueManagerImpl implements QueueManager {
                 Object body;
 
                 try {
-                    JsonNode bodyObj = mapper.readTree(message.getBody()).get("Message");
+                    final JsonNode bodyNode =  mapper.readTree(message.getBody());
+                    JsonNode bodyObj = bodyNode.has("Message") ? bodyNode.get("Message") : bodyNode;
                     body = fromString(bodyObj.textValue(), klass);
                 } catch (Exception e) {
                     logger.error(String.format("failed to deserialize message: %s", message.getBody()), e);
@@ -371,7 +372,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
 
-        PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
+        PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
 
         sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
                 @Override


[11/14] incubator-usergrid git commit: change comment, add replicated test

Posted by to...@apache.org.
change comment, add replicated test


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/d342875c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/d342875c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/d342875c

Branch: refs/heads/observable-query-fix
Commit: d342875c4c910e49bf4cc48054e5fa99c0e88e70
Parents: 68cafb7
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 17:43:34 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 17:43:34 2015 -0600

----------------------------------------------------------------------
 .../index/ReplicatedIndexLocationStrategy.java    |  2 +-
 .../corepersistence/index/IndexNamingTest.java    | 18 +++++++++++++++++-
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d342875c/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
index e1b6b9e..4dd237a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
@@ -26,7 +26,7 @@ import org.apache.usergrid.persistence.index.IndexAlias;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 
 /**
- * Classy class class.
+ * Strategy to replicate an index naming convention and publish elsewhere
  */
 public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/d342875c/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
index 2d77697..95ad17e 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/IndexNamingTest.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 import com.google.inject.Inject;
+import junit.framework.Assert;
 import net.jcip.annotations.NotThreadSafe;
 import org.apache.usergrid.corepersistence.TestIndexModule;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
@@ -173,7 +174,22 @@ public class IndexNamingTest {
             names.add(name);
         }
         //always hashes to diff't bucket
-        assertTrue(names.size()==indexProcessorFig.getNumberOfIndexBuckets());
+        assertTrue(names.size() == indexProcessorFig.getNumberOfIndexBuckets());
+    }
+
+    @Test
+    public void testReplication(){
+        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+        ReplicatedIndexLocationStrategy replicatedIndexLocationStrategy = new ReplicatedIndexLocationStrategy(indexLocationStrategy);
+        assertEquals(replicatedIndexLocationStrategy.getApplicationScope(),indexLocationStrategy.getApplicationScope());
+        assertEquals(replicatedIndexLocationStrategy.getIndexInitialName(),indexLocationStrategy.getIndexInitialName());
+        assertEquals(replicatedIndexLocationStrategy.getIndexRootName(),indexLocationStrategy.getIndexRootName());
+        assertEquals(replicatedIndexLocationStrategy.getNumberOfReplicas(), indexLocationStrategy.getNumberOfReplicas());
+        assertEquals(replicatedIndexLocationStrategy.getNumberOfShards(),indexLocationStrategy.getNumberOfShards());
+        assertEquals(replicatedIndexLocationStrategy.getAlias().getReadAlias(),indexLocationStrategy.getAlias().getReadAlias());
+        assertEquals(replicatedIndexLocationStrategy.getAlias().getWriteAlias(),indexLocationStrategy.getAlias().getWriteAlias());
+
+
     }
 
 }


[02/14] incubator-usergrid git commit: Update set data migration version resource to receive integers and echo back the post-update versions.

Posted by to...@apache.org.
Update set data migration version resource to receive integers and echo back the post-update versions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7fbb9f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7fbb9f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7fbb9f54

Branch: refs/heads/observable-query-fix
Commit: 7fbb9f54823f84f34cf3ae92a7cea3f64ac650ec
Parents: 48689eb
Author: Michael Russo <mi...@gmail.com>
Authored: Fri Jul 10 15:09:47 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Fri Jul 10 15:09:47 2015 -0700

----------------------------------------------------------------------
 .../apache/usergrid/rest/MigrateResource.java   | 26 +++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7fbb9f54/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
index ed0604a..da0ba0f 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/MigrateResource.java
@@ -130,23 +130,37 @@ public class MigrateResource extends AbstractContextResource {
         Preconditions.checkNotNull( json, "You must provide a json body" );
         Preconditions.checkArgument( json.keySet().size() > 0, "You must specify at least one module and version" );
 
+        ApiResponse response = createApiResponse();
+        response.setAction("Set Migration Versions");
+
+        ObjectNode node = JsonNodeFactory.instance.objectNode();
+
+        final DataMigrationManager dataMigrationManager = getDataMigrationManager();
+        final Set<String> plugins = dataMigrationManager.getPluginNames();
+
         /**
          *  Set the migration version for the plugins specified
          */
         for ( final String key : json.keySet() ) {
-            String version = ( String ) json.get( key );
 
-            Preconditions.checkArgument( version != null && version.length() > 0,
-                "You must specify a version field per module name" );
+            int version = ( int ) json.get( key );
 
+            dataMigrationManager.resetToVersion(key, version);
+        }
 
-            int intVersion = Integer.parseInt( version );
 
-            getDataMigrationManager().resetToVersion( key, intVersion );
+        /**
+         *  Echo back a response of the current versions for all plugins
+         */
+        for(final String pluginName: plugins){
+            node.put(pluginName, dataMigrationManager.getCurrentVersion(pluginName));
         }
 
 
-        return migrateStatus( ui, callback );
+        response.setData( node );
+        response.setSuccess();
+
+        return new JSONWithPadding( response, callback );
     }
 
 


[13/14] incubator-usergrid git commit: Merge branch 'pr/304' into USERGRID-840

Posted by to...@apache.org.
Merge branch 'pr/304' into USERGRID-840


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a9c9581b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a9c9581b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a9c9581b

Branch: refs/heads/observable-query-fix
Commit: a9c9581b59b666a5319ad0bb040c02fd18d610df
Parents: d342875 a721558
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 18:19:48 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 18:19:48 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  3 +-
 .../usergrid/persistence/queue/QueueScope.java  | 14 +++
 .../persistence/queue/impl/QueueScopeImpl.java  |  7 +-
 .../queue/impl/SNSQueueManagerImpl.java         | 96 ++++++++++++++------
 .../queue/util/AmazonNotificationUtils.java     | 42 +++++++++
 .../persistence/queue/QueueManagerTest.java     |  2 +-
 .../notifications/NotificationsService.java     |  2 +-
 .../services/notifications/QueueListener.java   |  2 +-
 .../usergrid/services/queues/QueueListener.java |  2 +-
 9 files changed, 134 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a9c9581b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --cc stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 70eb361,bd97d66..11ca2d3
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@@ -106,10 -104,9 +107,10 @@@ public class AmazonAsyncEventService im
  
          this.indexService = indexService;
          this.entityCollectionManagerFactory = entityCollectionManagerFactory;
 -        this.rxTaskScheduler = rxTaskScheduler;
 +        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
 +        this.entityIndexFactory = entityIndexFactory;
  
-         final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+         this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
          this.queue = queueManagerFactory.getQueueManager(queueScope);
          this.indexProcessorFig = indexProcessorFig;
  

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a9c9581b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --cc stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 1214a39,60138ee..f41d238
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@@ -372,9 -407,9 +408,9 @@@ public class SNSQueueManagerImpl implem
  
          if (logger.isDebugEnabled()) logger.debug("Publishing Message...{} to arn: {}", stringBody, topicArn);
  
 -        PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
 +        PublishRequest publishRequest = new PublishRequest(topicArn, stringBody);
  
-         sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+         snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
                  @Override
                  public void onError(Exception e) {
                      logger.error("Error publishing message... {}", e);


[03/14] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' into pr/303

Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' into pr/303


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/1adcfd29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/1adcfd29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/1adcfd29

Branch: refs/heads/observable-query-fix
Commit: 1adcfd292ad42fdc65ace103fb8f3b54e51f2cb4
Parents: 7fbb9f5 a842d57
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:19:19 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:19:19 2015 -0600

----------------------------------------------------------------------
 .../rx/ObservableToBlockingIteratorFactory.java | 107 ++++++++++++-------
 .../persistence/core/rx/OrderedMergeTest.java   |  35 ++++--
 2 files changed, 96 insertions(+), 46 deletions(-)
----------------------------------------------------------------------



[10/14] incubator-usergrid git commit: Add support for having single region and all region AWS queue implementations.

Posted by to...@apache.org.
Add support for having single region and all region AWS queue implementations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/c467a385
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/c467a385
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/c467a385

Branch: refs/heads/observable-query-fix
Commit: c467a3858ed859443666d6f497832bae8d1652f9
Parents: ad3916f
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Jul 13 16:31:18 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Jul 13 16:31:18 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java          |  3 ++-
 .../apache/usergrid/persistence/queue/QueueScope.java | 14 ++++++++++++++
 .../persistence/queue/impl/QueueScopeImpl.java        |  7 ++++++-
 .../persistence/queue/impl/SNSQueueManagerImpl.java   |  4 ++--
 .../usergrid/persistence/queue/QueueManagerTest.java  |  2 +-
 .../services/notifications/NotificationsService.java  |  2 +-
 .../services/notifications/QueueListener.java         |  2 +-
 .../usergrid/services/queues/QueueListener.java       |  2 +-
 8 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c5b836b..bd97d66 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -73,6 +73,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private static final String QUEUE_NAME = "es_queue";
 
     private final QueueManager queue;
+    private final QueueScope queueScope;
     private final IndexProcessorFig indexProcessorFig;
     private final IndexService indexService;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
@@ -105,7 +106,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.rxTaskScheduler = rxTaskScheduler;
 
-        final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
+        this.queueScope = new QueueScopeImpl(QUEUE_NAME, QueueScope.RegionImplementation.ALLREGIONS);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
         this.indexProcessorFig = indexProcessorFig;
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
index cf6bf24..4beacf6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/QueueScope.java
@@ -24,8 +24,22 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 public interface QueueScope  {
 
     /**
+     * LOCALREGION will create a SNS topic with a queue subscription in a single AWS region.
+     * ALLREGIONS will create SNS topics and queue subscriptions  in ALL AWS regions.
+     */
+    enum RegionImplementation {
+        LOCALREGION,
+        ALLREGIONS
+    }
+
+    /**
      * Get the name of the the map
      * @return
      */
     public String getName();
+
+    /**
+     * Get the Usergrid region enum
+     */
+    public RegionImplementation getRegionImplementation();
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
index 381cd8e..09a0bcd 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/QueueScopeImpl.java
@@ -26,9 +26,11 @@ import org.apache.usergrid.persistence.queue.QueueScope;
 public class QueueScopeImpl implements QueueScope {
 
     private final String name;
+    private final RegionImplementation regionImpl;
 
-    public QueueScopeImpl(  final String name ) {
+    public QueueScopeImpl(  final String name, final RegionImplementation regionImpl) {
         this.name = name;
+        this.regionImpl = regionImpl;
     }
 
 
@@ -40,6 +42,9 @@ public class QueueScopeImpl implements QueueScope {
     }
 
     @Override
+    public RegionImplementation getRegionImplementation() {return regionImpl;}
+
+    @Override
     public boolean equals( final Object o ) {
         if ( this == o ) {
             return true;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index c5a0f30..60138ee 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -158,7 +158,7 @@ public class SNSQueueManagerImpl implements QueueManager {
             logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
         }
 
-        if (fig.isMultiRegion()) {
+        if (fig.isMultiRegion() && scope.getRegionImplementation() == QueueScope.RegionImplementation.ALLREGIONS) {
 
             String multiRegion = fig.getRegionList();
 
@@ -299,7 +299,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
 
     private String getName() {
-        String name = clusterFig.getClusterName() + "_" + scope.getName();
+        String name = clusterFig.getClusterName() + "_" + scope.getName() + "_" + scope.getRegionImplementation();
 
         Preconditions.checkArgument(name.length() <= 80, "Your name must be < than 80 characters");
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
index 33fa1f5..0ed6065 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/queue/QueueManagerTest.java
@@ -65,7 +65,7 @@ public class QueueManagerTest {
 
     @Before
     public void mockApp() {
-        this.scope = new QueueScopeImpl(  "testQueue" );
+        this.scope = new QueueScopeImpl( "testQueue", QueueScope.RegionImplementation.LOCALREGION );
         qm = qmf.getQueueManager(scope);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
index 76f10c9..130756d 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/NotificationsService.java
@@ -108,7 +108,7 @@ public class NotificationsService extends AbstractCollectionService {
         postTimer = metricsService.getTimer(this.getClass(), "collection.post_requests");
         JobScheduler jobScheduler = new JobScheduler(sm,em);
         String name = ApplicationQueueManagerImpl.getQueueNames( props );
-        QueueScope queueScope = new QueueScopeImpl( name );
+        QueueScope queueScope = new QueueScopeImpl( name, QueueScope.RegionImplementation.LOCALREGION );
         queueManagerFactory = getApplicationContext().getBean( Injector.class ).getInstance(QueueManagerFactory.class);
         QueueManager queueManager = queueManagerFactory.getQueueManager(queueScope);
         notificationQueueManager = new ApplicationQueueManagerImpl(jobScheduler,em,queueManager,metricsService,props);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
index 8c765ac..eba0060 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java
@@ -146,7 +146,7 @@ public class QueueListener  {
         com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");
         svcMgr = smf.getServiceManager(smf.getManagementAppId());
         LOG.info("getting from queue {} ", queueName);
-        QueueScope queueScope = new QueueScopeImpl( queueName ) {};
+        QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION );
         QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
         // run until there are no more active jobs
         final AtomicLong runCount = new AtomicLong(0);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/c467a385/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
----------------------------------------------------------------------
diff --git a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
index d38db95..bd57bb3 100644
--- a/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
+++ b/stack/services/src/main/java/org/apache/usergrid/services/queues/QueueListener.java
@@ -160,7 +160,7 @@ public abstract class QueueListener  {
         LOG.info("QueueListener: Starting execute process.");
         svcMgr = smf.getServiceManager(smf.getManagementAppId());
         LOG.info("getting from queue {} ", queueName);
-        QueueScope queueScope = new QueueScopeImpl( queueName);
+        QueueScope queueScope = new QueueScopeImpl( queueName, QueueScope.RegionImplementation.LOCALREGION);
         QueueManager queueManager = TEST_QUEUE_MANAGER != null ? TEST_QUEUE_MANAGER : queueManagerFactory.getQueueManager(queueScope);
         // run until there are no more active jobs
         long runCount = 0;


[06/14] incubator-usergrid git commit: adding async events

Posted by to...@apache.org.
adding async events


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cbea6e0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cbea6e0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cbea6e0b

Branch: refs/heads/observable-query-fix
Commit: cbea6e0bdf20d975e538b5ac6ec9b9b7110c0367
Parents: de1daae
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 11:15:24 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 11:15:24 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    | 19 +++++++----
 .../asyncevents/AsyncEventService.java          |  8 +++++
 .../asyncevents/InMemoryAsyncEventService.java  | 14 ++++++++
 .../asyncevents/model/AsyncEvent.java           | 17 ++++++++-
 .../model/InitializeApplicationIndexEvent.java  | 36 ++++++++++++++++++++
 .../model/InitializeManagementIndexEvent.java   | 32 +++++++++++++++++
 .../index/ApplicationIndexLocationStrategy.java |  7 ----
 .../index/IndexLocationStrategyFactoryImpl.java |  2 +-
 8 files changed, 120 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index c5b836b..e15824f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -30,6 +30,7 @@ import com.google.common.base.Preconditions;
 import org.apache.usergrid.corepersistence.CpEntityManager;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,11 +77,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final IndexProcessorFig indexProcessorFig;
     private final IndexService indexService;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
-    private final RxTaskScheduler rxTaskScheduler;
 
     private final Timer readTimer;
     private final Timer writeTimer;
-    private final Timer messageProcessingTimer;
 
     private final Object mutex = new Object();
 
@@ -98,12 +97,11 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                    final IndexProcessorFig indexProcessorFig,
                                    final MetricsFactory metricsFactory,
                                    final IndexService indexService,
-                                   final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                                   final RxTaskScheduler rxTaskScheduler) {
+                                   final EntityCollectionManagerFactory entityCollectionManagerFactory
+    ) {
 
         this.indexService = indexService;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
-        this.rxTaskScheduler = rxTaskScheduler;
 
         final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -111,7 +109,6 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
         this.writeTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.write");
         this.readTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.read");
-        this.messageProcessingTimer = metricsFactory.getTimer(AmazonAsyncEventService.class, "async_event.message_processing");
         this.indexErrorCounter = metricsFactory.getCounter(AmazonAsyncEventService.class, "async_event.error");
         this.messageCycle = metricsFactory.getHistogram(AmazonAsyncEventService.class, "async_event.message_cycle");
 
@@ -244,6 +241,16 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
 
     @Override
+    public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
+        offer( new InitializeApplicationIndexEvent( applicationScope  ) );
+    }
+
+    @Override
+    public void queueInitializeManagementIndex() {
+        offer( new InitializeManagementIndexEvent( ) );
+    }
+
+    @Override
     public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
                                        final Entity entity) {
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 9fbed39..1c97c3e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
 
@@ -34,6 +35,13 @@ public interface AsyncEventService extends ReIndexAction {
 
 
     /**
+     * Initialize index for creation
+     * @param applicationScope
+     */
+    void queueInitializeApplicationIndex( final ApplicationScope applicationScope );
+    void queueInitializeManagementIndex( );
+
+    /**
      * Queue an entity to be indexed.  This will start processing immediately. For implementations that are realtime (akka, in memory)
      * We will return a distributed future.  For SQS impls, this will return immediately, and the result will not be available.
      * After SQS is removed, the tests should be enhanced to ensure that we're processing our queues correctly.

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index b8e544d..2fc0bb2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -20,6 +20,8 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
+import com.amazonaws.services.opsworks.model.App;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,6 +65,18 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
 
     @Override
+    public void queueInitializeApplicationIndex(final ApplicationScope applicationScope) {
+        //index will be initialized locally, don't need to inform other indexes
+        return;
+    }
+
+    @Override
+    public void queueInitializeManagementIndex() {
+
+    }
+
+
+    @Override
     public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) {
 
         //process the entity immediately

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 66476f9..b331b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -61,6 +61,12 @@ public class AsyncEvent implements Serializable {
     protected AsyncEvent() {
     }
 
+    public AsyncEvent(final EventType eventType) {
+
+        this.eventType = eventType;
+        this.creationTime = System.currentTimeMillis();
+    }
+
     public AsyncEvent(final EventType eventType,
                       final EntityIdScope entityIdScope) {
 
@@ -69,6 +75,12 @@ public class AsyncEvent implements Serializable {
         this.creationTime = System.currentTimeMillis();
     }
 
+    public AsyncEvent(EventType eventType, ApplicationScope applicationScope) {
+        this.eventType = eventType;
+        this.applicationScope = applicationScope;
+        this.creationTime = System.currentTimeMillis();
+    }
+
     public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Edge edge) {
         this.eventType = eventType;
         this.applicationScope = applicationScope;
@@ -136,7 +148,10 @@ public class AsyncEvent implements Serializable {
         EDGE_DELETE,
         EDGE_INDEX,
         ENTITY_DELETE,
-        ENTITY_INDEX;
+        ENTITY_INDEX,
+        APPLICATION_INDEX,
+        MANAGEMENT_INDEX;
+        ;
 
 
         public String asString() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
new file mode 100644
index 0000000..6612e8b
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -0,0 +1,36 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+/**
+ * event to init app index
+ */
+@JsonDeserialize(as = AsyncEvent.class)
+public class InitializeApplicationIndexEvent extends AsyncEvent {
+    public InitializeApplicationIndexEvent() {
+    }
+    public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
+        super(EventType.APPLICATION_INDEX, applicationScope);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
new file mode 100644
index 0000000..0af249d
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.asyncevents.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+
+/**
+ * Event to initialize mgmt index
+ */
+@JsonDeserialize(as = AsyncEvent.class)
+public class InitializeManagementIndexEvent extends AsyncEvent{
+    public InitializeManagementIndexEvent(){
+        super(EventType.MANAGEMENT_INDEX);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
index fcfb09b..c105119 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ApplicationIndexLocationStrategy.java
@@ -31,11 +31,8 @@ import org.apache.usergrid.utils.StringUtils;
  * Strategy for getting the application index name.
  */
 class ApplicationIndexLocationStrategy implements IndexLocationStrategy {
-    private final ClusterFig clusterFig;
-    private final CassandraFig cassandraFig;
     private final IndexFig indexFig;
     private final ApplicationScope applicationScope;
-    private final ApplicationIndexBucketLocator applicationIndexBucketLocator;
     private final String indexBucketName;
     private final IndexAlias alias;
     private final String indexRootName;
@@ -45,12 +42,8 @@ class ApplicationIndexLocationStrategy implements IndexLocationStrategy {
                                             final IndexFig indexFig,
                                             final ApplicationScope applicationScope,
                                             final ApplicationIndexBucketLocator applicationIndexBucketLocator){
-        this.clusterFig = clusterFig;
-
-        this.cassandraFig = cassandraFig;
         this.indexFig = indexFig;
         this.applicationScope = applicationScope;
-        this.applicationIndexBucketLocator = applicationIndexBucketLocator;
         this.indexRootName  = clusterFig.getClusterName() + "_" + cassandraFig.getApplicationKeyspace().toLowerCase();
         this.alias =  new ApplicationIndexAlias(indexFig, applicationScope, indexRootName);
         this.indexBucketName = indexRootName + "_applications_" + applicationIndexBucketLocator.getBucket(applicationScope);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cbea6e0b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
index 2d71e41..6a99890 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexLocationStrategyFactoryImpl.java
@@ -47,7 +47,7 @@ public class IndexLocationStrategyFactoryImpl implements IndexLocationStrategyFa
         this.applicationLocatorBucketStrategy = applicationLocatorBucketStrategy;
         this.coreIndexFig = coreIndexFig;
     }
-    public IndexLocationStrategy getIndexLocationStrategy(ApplicationScope applicationScope){
+    public IndexLocationStrategy getIndexLocationStrategy( final ApplicationScope applicationScope){
         if(CpNamingUtils.getManagementApplicationId().equals(applicationScope.getApplication())){
             return new ManagementIndexLocationStrategy(clusterFig,cassandraFig,indexFig,coreIndexFig);
         }


[04/14] incubator-usergrid git commit: Revert "adding offer instead of put"

Posted by to...@apache.org.
Revert "adding offer instead of put"

This reverts commit a842d5700e9d374fc091a3085ddf5e7f30bf806a.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/de1daaeb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/de1daaeb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/de1daaeb

Branch: refs/heads/observable-query-fix
Commit: de1daaeb19c438f458cb608cfa9ede4ede5f8f97
Parents: 1adcfd2
Author: Shawn Feldman <sf...@apache.org>
Authored: Fri Jul 10 16:44:10 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Fri Jul 10 16:44:10 2015 -0600

----------------------------------------------------------------------
 .../rx/ObservableToBlockingIteratorFactory.java | 107 +++++++------------
 .../persistence/core/rx/OrderedMergeTest.java   |  35 ++----
 2 files changed, 46 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
index ec5056e..9807749 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableToBlockingIteratorFactory.java
@@ -31,8 +31,6 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
 
 /**
  * Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -56,7 +54,7 @@ public final class ObservableToBlockingIteratorFactory {
      * @return the iterator that could be used to iterate over the elements of the observable.
      */
     public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
-        final BlockingQueue<Notification<? extends T>> notifications = new SynchronousQueue<>(true);
+        final BlockingQueue<Notification<? extends T>> notifications = new ArrayBlockingQueue<>(1);
 
         // using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
         final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@@ -67,88 +65,61 @@ public final class ObservableToBlockingIteratorFactory {
 
             @Override
             public void onError(Throwable e) {
-                boolean offerFinished = false;
                 try{
-                    do {
-                        offerFinished = notifications.offer(Notification.<T>createOnError(e), 1000, TimeUnit.MILLISECONDS);
-                    }while (!offerFinished && !this.isUnsubscribed());
-                }catch (InterruptedException t){
+                    notifications.put(Notification.<T>createOnError(e));
+                }catch (Exception t){
 
                 }
             }
 
             @Override
             public void onNext(Notification<? extends T> args) {
-                boolean offerFinished = false;
+                try{
+                    notifications.put(args);
+                }catch (Exception t){
 
-                try {
-                    do {
-                        offerFinished =  notifications.offer(args, 1000, TimeUnit.MILLISECONDS);
-                    } while (!offerFinished && !this.isUnsubscribed());
+                }
+            }
+        });
 
-                } catch (InterruptedException t) {
+        return new Iterator<T>() {
+            private Notification<? extends T> buf;
 
+            @Override
+            public boolean hasNext() {
+                if (buf == null) {
+                    buf = take();
                 }
+                if (buf.isOnError()) {
+                    throw Exceptions.propagate(buf.getThrowable());
+                }
+                return !buf.isOnCompleted();
             }
 
             @Override
-            protected void finalize() throws Throwable {
-                super.finalize();
+            public T next() {
+                if (hasNext()) {
+                    T result = buf.getValue();
+                    buf = null;
+                    return result;
+                }
+                throw new NoSuchElementException();
             }
-        });
-
-        return new ObservableBlockingIterator<T>(notifications,subscription);
-    }
-
-    private static class ObservableBlockingIterator<T> implements Iterator<T> {
-        private final BlockingQueue<Notification<? extends T>> notifications;
-        private final Subscription subscription;
 
-        public ObservableBlockingIterator(BlockingQueue<Notification<? extends T>> notifications, Subscription subscription) {
-            this.notifications = notifications;
-            this.subscription = subscription;
-        }
-
-        private Notification<? extends T> buf;
-
-        @Override
-        public boolean hasNext() {
-            if (buf == null) {
-                buf = take();
-            }
-            if (buf.isOnError()) {
-                throw Exceptions.propagate(buf.getThrowable());
-            }
-            return !buf.isOnCompleted();
-        }
-
-        @Override
-        public T next() {
-            if (hasNext()) {
-                T result = buf.getValue();
-                buf = null;
-                return result;
-            }
-            throw new NoSuchElementException();
-        }
-
-        private Notification<? extends T> take() {
-            try {
-                return notifications.take();
-            } catch (InterruptedException e) {
-                subscription.unsubscribe();
-                throw Exceptions.propagate(e);
+            private Notification<? extends T> take() {
+                try {
+                    return notifications.take();
+                } catch (InterruptedException e) {
+                    subscription.unsubscribe();
+                    throw Exceptions.propagate(e);
+                }
             }
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("Read-only iterator");
-        }
 
-        @Override
-        protected void finalize() throws Throwable {
-            super.finalize();
-        }
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException("Read-only iterator");
+            }
+        };
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/de1daaeb/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
index a81ef8f..649ac7a 100644
--- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
+++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/OrderedMergeTest.java
@@ -547,7 +547,7 @@ public class OrderedMergeTest {
                 //pull from source
                 for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
                     //emit
-                    log.info("inner produce " + count);
+                    log.info("loop " + count);
                     subscriber.onNext(count++);
                 }
             }
@@ -559,34 +559,13 @@ public class OrderedMergeTest {
                 log.info("iteration " + o);
             }).subscribeOn(Schedulers.io()));
         //never
-        for(int i =0; i<20;i++){
-            Object it =iterator.next();
-            log.info("iterate "+i);
-        }
-
-        iterator = ObservableToBlockingIteratorFactory.toIterator(Observable.create(subscriber -> {
-            int count = 0;
-            while (!subscriber.isUnsubscribed()) {
-                //pull from source
-                for (int i = 0; i < 10 && !subscriber.isUnsubscribed(); i++) {
-                    //emit
-                    log.info("inner produce " + count);
-                    subscriber.onNext(count++);
-                }
-            }
+        Object it =iterator.next();
+        it = iterator.next();
+        log.info("iterate");
+        it = iterator.next();
+        log.info("iterate");
 
-            subscriber.onCompleted();
-        })
-            .onBackpressureBlock(1)
-            .buffer(2)
-            .doOnNext(o -> {
-                log.info("iteration " + o);
-            }).subscribeOn(Schedulers.io()));
-        //never
-        for(int i =0; i<20;i++){
-            Object it =iterator.next();
-            log.info("iterate "+i);
-        }
+        Object size = it;
     }
 
 


[14/14] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into observable-query-fix

Posted by to...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into observable-query-fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7256ea73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7256ea73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7256ea73

Branch: refs/heads/observable-query-fix
Commit: 7256ea731520ebf42e0c343a907410af35776411
Parents: 208332d a9c9581
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Jul 14 11:09:57 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Jul 14 11:09:57 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |   4 +-
 .../asyncevents/AmazonAsyncEventService.java    |  46 +++++--
 .../asyncevents/AsyncEventService.java          |   8 ++
 .../asyncevents/AsyncIndexProvider.java         |  14 +-
 .../asyncevents/InMemoryAsyncEventService.java  |   8 ++
 .../asyncevents/model/AsyncEvent.java           |  29 +++-
 .../model/InitializeApplicationIndexEvent.java  |  38 +++++
 .../index/ApplicationIndexLocationStrategy.java |   7 -
 .../index/IndexLocationStrategyFactoryImpl.java |   2 +-
 .../index/ReplicatedIndexLocationStrategy.java  | 137 +++++++++++++++++++
 .../index/AmazonAsyncEventServiceTest.java      |  10 +-
 .../corepersistence/index/IndexNamingTest.java  |  18 ++-
 .../usergrid/persistence/queue/QueueScope.java  |  14 ++
 .../persistence/queue/impl/QueueScopeImpl.java  |   7 +-
 .../queue/impl/SNSQueueManagerImpl.java         | 101 +++++++++-----
 .../queue/util/AmazonNotificationUtils.java     |  42 ++++++
 .../persistence/queue/QueueManagerTest.java     |   2 +-
 .../apache/usergrid/rest/MigrateResource.java   |  26 +++-
 .../notifications/NotificationsService.java     |   2 +-
 .../services/notifications/QueueListener.java   |   2 +-
 .../usergrid/services/queues/QueueListener.java |   2 +-
 21 files changed, 452 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7256ea73/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------


[12/14] incubator-usergrid git commit: Add exception to failed policy log statement.

Posted by to...@apache.org.
Add exception to failed policy log statement.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a7215580
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a7215580
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a7215580

Branch: refs/heads/observable-query-fix
Commit: a721558092fe9a04d9f96cd4ac020e6b430cc359
Parents: c467a38
Author: Michael Russo <mi...@gmail.com>
Authored: Mon Jul 13 17:03:13 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Mon Jul 13 17:03:13 2015 -0700

----------------------------------------------------------------------
 .../usergrid/persistence/queue/util/AmazonNotificationUtils.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a7215580/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index 52a4925..1d86823 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -96,7 +96,7 @@ public class AmazonNotificationUtils {
         try {
             sqs.setQueueAttributes(queueAttributesRequest);
         }catch (Exception e){
-            logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString());
+            logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString(), e);
         }
 
 


[05/14] incubator-usergrid git commit: Ensure all queues are subscribed to the multi-region SNS topics with appropriate permissions. Topic/queue creation covered back to synchronous SNS client, leaving message send still using the async client.

Posted by to...@apache.org.
Ensure all queues are subscribed to the multi-region SNS topics with appropriate permissions.  Topic/queue creation covered back to synchronous SNS client, leaving message send still using the async client.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/ad3916ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/ad3916ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/ad3916ff

Branch: refs/heads/observable-query-fix
Commit: ad3916ffc9ec2285f88ec62c4752065a8117147c
Parents: 48689eb
Author: Michael Russo <mi...@gmail.com>
Authored: Sat Jul 11 17:25:20 2015 -0700
Committer: Michael Russo <mi...@gmail.com>
Committed: Sun Jul 12 16:35:54 2015 -0700

----------------------------------------------------------------------
 .../queue/impl/SNSQueueManagerImpl.java         | 92 ++++++++++++++------
 .../queue/util/AmazonNotificationUtils.java     | 42 +++++++++
 2 files changed, 106 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ad3916ff/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
index 257f25e..c5a0f30 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/impl/SNSQueueManagerImpl.java
@@ -23,8 +23,8 @@ import com.amazonaws.handlers.AsyncHandler;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 import com.amazonaws.services.sns.AmazonSNSAsyncClient;
+import com.amazonaws.services.sns.AmazonSNSClient;
 import com.amazonaws.services.sns.model.*;
-import com.amazonaws.services.sns.util.Topics;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 import com.amazonaws.services.sqs.model.*;
 import com.fasterxml.jackson.core.JsonFactory;
@@ -55,7 +55,8 @@ public class SNSQueueManagerImpl implements QueueManager {
     private final QueueFig fig;
     private final ClusterFig clusterFig;
     private final AmazonSQSClient sqs;
-    private final AmazonSNSAsyncClient sns;
+    private final AmazonSNSClient sns;
+    private final AmazonSNSAsyncClient snsAsync;
 
 
     private final JsonFactory JSON_FACTORY = new JsonFactory();
@@ -112,6 +113,7 @@ public class SNSQueueManagerImpl implements QueueManager {
         try {
             sqs = createSQSClient(getRegion());
             sns = createSNSClient(getRegion());
+            snsAsync = createAsyncSNSClient(getRegion());
 
         } catch (Exception e) {
             throw new RuntimeException("Error setting up mapper", e);
@@ -144,7 +146,14 @@ public class SNSQueueManagerImpl implements QueueManager {
         }
 
         try {
-            Topics.subscribeQueue(sns, sqs, primaryTopicArn, queueUrl);
+
+            SubscribeRequest primarySubscribeRequest = new SubscribeRequest(primaryTopicArn, "sqs", primaryQueueArn);
+            sns.subscribe(primarySubscribeRequest);
+
+            // ensure the SNS primary topic has permission to send to the primary SQS queue
+            List<String> primaryTopicArnList = new ArrayList<>();
+            primaryTopicArnList.add(primaryTopicArn);
+            AmazonNotificationUtils.setQueuePermissionsToReceive(sqs, queueUrl, primaryTopicArnList);
         } catch (AmazonServiceException e) {
             logger.error(String.format("Unable to subscribe PRIMARY queue=[%s] to topic=[%s]", queueUrl, primaryTopicArn), e);
         }
@@ -167,19 +176,18 @@ public class SNSQueueManagerImpl implements QueueManager {
             for (String regionName : regionNames) {
 
                 regionName = regionName.trim();
-
                 Regions regions = Regions.fromName(regionName);
                 Region region = Region.getRegion(regions);
 
-                final AmazonSQSClient sqsClient = createSQSClient(region);
-                final AmazonSNSAsyncClient snsClient = createSNSClient(region);
+                AmazonSQSClient sqsClient = createSQSClient(region);
+                AmazonSNSClient snsClient = createSNSClient(region); // do this stuff synchronously
 
+                // getTopicArn will create the SNS topic if it doesn't exist
                 String topicArn = AmazonNotificationUtils.getTopicArn(snsClient, queueName, true);
-
                 topicArns.put(topicArn, regionName);
 
+                // create the SQS queue if it doesn't exist
                 String queueArn = AmazonNotificationUtils.getQueueArnByName(sqsClient, queueName);
-
                 if (queueArn == null) {
                     queueUrl = AmazonNotificationUtils.createQueue(sqsClient, queueName, fig);
                     queueArn = AmazonNotificationUtils.getQueueArnByUrl(sqsClient, queueUrl);
@@ -197,36 +205,41 @@ public class SNSQueueManagerImpl implements QueueManager {
                 Regions sqsRegions = Regions.fromName(strSqsRegion);
                 Region sqsRegion = Region.getRegion(sqsRegions);
 
-                final AmazonSQSClient sqsClient = createSQSClient(sqsRegion);
+                AmazonSQSClient subscribeSqsClient = createSQSClient(sqsRegion);
 
-                logger.info("Creating subscriptions for QUEUE ARN=[{}]", queueARN);
+                // ensure the URL used to subscribe is for the correct name/region
+                String subscribeQueueUrl = AmazonNotificationUtils.getQueueUrlByName(subscribeSqsClient, queueName);
 
-                for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
-                    String topicARN = topicArnEntry.getKey();
+                // this list used later for adding permissions to queues
+                List<String> topicArnList = new ArrayList<>();
 
-                    logger.info("Creating subscriptions for TOPIC ARN=[{}]", topicArnEntry.getKey());
+                for (Map.Entry<String, String> topicArnEntry : topicArns.entrySet()) {
 
-                    String strSnsRegion = queueArnEntry.getValue();
+                    String topicARN = topicArnEntry.getKey();
+                    topicArnList.add(topicARN);
 
+                    String strSnsRegion = topicArnEntry.getValue();
                     Regions snsRegions = Regions.fromName(strSnsRegion);
                     Region snsRegion = Region.getRegion(snsRegions);
 
-                    final AmazonSNSAsyncClient snsClient = createSNSClient(snsRegion);
+                    AmazonSNSClient subscribeSnsClient = createSNSClient(snsRegion); // do this stuff synchronously
+                    SubscribeRequest subscribeRequest = new SubscribeRequest(topicARN, "sqs", queueARN);
 
                     try {
 
                         logger.info("Subscribing Queue ARN/Region=[{} / {}] and Topic ARN/Region=[{} / {}]",
-                                queueARN,
-                                strSqsRegion,
-                                topicARN,
-                                strSnsRegion
+                            queueARN,
+                            strSqsRegion,
+                            topicARN,
+                            strSnsRegion
                         );
 
-                        Topics.subscribeQueue(
-                                snsClient,
-                                sqsClient,
-                                topicArnEntry.getKey(),
-                                queueArnEntry.getKey());
+                        SubscribeResult subscribeResult = subscribeSnsClient.subscribe(subscribeRequest);
+                        String subscriptionARN = subscribeResult.getSubscriptionArn();
+                        if(logger.isDebugEnabled()){
+                            logger.debug("Successfully subscribed Queue ARN=[{}] to Topic ARN=[{}], subscription ARN=[{}]", queueARN, topicARN, subscriptionARN);
+                        }
+
 
                     } catch (Exception e) {
                         logger.error(String.format("ERROR Subscribing Queue ARN/Region=[%s / %s] and Topic ARN/Region=[%s / %s]",
@@ -238,14 +251,23 @@ public class SNSQueueManagerImpl implements QueueManager {
 
                     }
                 }
+
+                logger.info("Adding permission to receive messages...");
+                // add permission to each queue, providing a list of topics that it's subscribed to
+                AmazonNotificationUtils.setQueuePermissionsToReceive(subscribeSqsClient, subscribeQueueUrl, topicArnList);
+
             }
         }
 
         return primaryTopicArn;
     }
 
+    /**
+     * The Asynchronous SNS client is used for publishing events to SNS.
+     *
+     */
 
-    private AmazonSNSAsyncClient createSNSClient(final Region region) {
+    private AmazonSNSAsyncClient createAsyncSNSClient(final Region region) {
         final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
 
         /**
@@ -261,6 +283,20 @@ public class SNSQueueManagerImpl implements QueueManager {
         return sns;
     }
 
+    /**
+     * The Synchronous SNS client is used for creating topics and subscribing queues.
+     *
+     */
+    private AmazonSNSClient createSNSClient(final Region region) {
+        final UsergridAwsCredentialsProvider ugProvider = new UsergridAwsCredentialsProvider();
+
+        final AmazonSNSClient sns = new AmazonSNSClient(ugProvider.getCredentials());
+
+        sns.setRegion(region);
+
+        return sns;
+    }
+
 
     private String getName() {
         String name = clusterFig.getClusterName() + "_" + scope.getName();
@@ -346,7 +382,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     @Override
     public void sendMessages(final List bodies) throws IOException {
 
-        if (sns == null) {
+        if (snsAsync == null) {
             logger.error("SNS client is null, perhaps it failed to initialize successfully");
             return;
         }
@@ -360,7 +396,7 @@ public class SNSQueueManagerImpl implements QueueManager {
     @Override
     public void sendMessage(final Object body) throws IOException {
 
-        if (sns == null) {
+        if (snsAsync == null) {
             logger.error("SNS client is null, perhaps it failed to initialize successfully");
             return;
         }
@@ -373,7 +409,7 @@ public class SNSQueueManagerImpl implements QueueManager {
 
         PublishRequest publishRequest = new PublishRequest(topicArn, toString(body));
 
-        sns.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
+        snsAsync.publishAsync(publishRequest, new AsyncHandler<PublishRequest, PublishResult>() {
                 @Override
                 public void onError(Exception e) {
                     logger.error("Error publishing message... {}", e);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/ad3916ff/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
index f7b2e06..52a4925 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/queue/util/AmazonNotificationUtils.java
@@ -1,6 +1,9 @@
 package org.apache.usergrid.persistence.queue.util;
 
 import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.policy.*;
+import com.amazonaws.auth.policy.actions.SQSActions;
+import com.amazonaws.auth.policy.conditions.ConditionFactory;
 import com.amazonaws.services.sns.AmazonSNSClient;
 import com.amazonaws.services.sns.model.*;
 import com.amazonaws.services.sns.util.Topics;
@@ -11,7 +14,9 @@ import org.apache.usergrid.persistence.queue.QueueFig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -60,6 +65,43 @@ public class AmazonNotificationUtils {
         return url;
     }
 
+    public static void setQueuePermissionsToReceive(final AmazonSQSClient sqs,
+                                                    final String queueUrl,
+                                                    final List<String> topicARNs) throws Exception{
+
+        String queueARN = getQueueArnByUrl(sqs, queueUrl);
+
+        Statement statement = new Statement(Statement.Effect.Allow)
+            .withActions(SQSActions.SendMessage)
+            .withPrincipals(new Principal("*"))
+            .withResources(new Resource(queueARN));
+
+        List<Condition> conditions = new ArrayList<>();
+
+        for(String topicARN : topicARNs){
+
+            conditions.add(ConditionFactory.newSourceArnCondition(topicARN));
+
+        }
+        statement.setConditions(conditions);
+
+        Policy policy = new Policy("SubscriptionPermission").withStatements(statement);
+
+
+        final Map<String, String> queueAttributes = new HashMap<>();
+        queueAttributes.put("Policy", policy.toJson());
+
+        SetQueueAttributesRequest queueAttributesRequest = new SetQueueAttributesRequest(queueUrl, queueAttributes);
+
+        try {
+            sqs.setQueueAttributes(queueAttributesRequest);
+        }catch (Exception e){
+            logger.error("Failed to set permissions on QUEUE ARN=[{}] for TOPIC ARNs=[{}]", queueARN, topicARNs.toString());
+        }
+
+
+    }
+
 
     public static String getQueueArnByName(final AmazonSQSClient sqs,
                                            final String queueName)


[07/14] incubator-usergrid git commit: adding async events

Posted by to...@apache.org.
adding async events


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a63c8175
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a63c8175
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a63c8175

Branch: refs/heads/observable-query-fix
Commit: a63c8175d362054f49ce25de53dbbbdf402e2141
Parents: cbea6e0
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 14:09:37 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 14:09:37 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java |  4 ++-
 .../asyncevents/AmazonAsyncEventService.java    | 35 ++++++++++++++++----
 .../asyncevents/AsyncEventService.java          |  1 -
 .../asyncevents/AsyncIndexProvider.java         | 14 ++++++--
 .../asyncevents/InMemoryAsyncEventService.java  |  6 ----
 .../asyncevents/model/AsyncEvent.java           |  4 +--
 .../model/InitializeApplicationIndexEvent.java  |  1 +
 .../model/InitializeManagementIndexEvent.java   | 32 ------------------
 .../index/AmazonAsyncEventServiceTest.java      | 10 +++++-
 9 files changed, 54 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index 048c558..f24891e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -154,6 +154,7 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
     private void init() {
 
         EntityManager em = getEntityManager(getManagementAppId());
+        indexService.queueInitializeApplicationIndex(CpNamingUtils.getApplicationScope(getManagementAppId()));
 
         try {
             if ( em.getApplication() == null ) {
@@ -252,9 +253,10 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         }
 
         getSetup().setupApplicationKeyspace( applicationId, appName );
+        indexService.queueInitializeApplicationIndex(CpNamingUtils.getApplicationScope(applicationId));
 
         if ( properties == null ) {
-            properties = new TreeMap<>( CASE_INSENSITIVE_ORDER );
+            properties = new TreeMap<>( CASE_INSENSITIVE_ORDER);
         }
         properties.put( PROPERTY_NAME, appName );
         EntityManager appEm = getEntityManager(applicationId);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index e15824f..50000df 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,7 +29,10 @@ import com.codahale.metrics.Histogram;
 import com.google.common.base.Preconditions;
 import org.apache.usergrid.corepersistence.CpEntityManager;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+import org.apache.usergrid.persistence.index.EntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
@@ -77,6 +80,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
     private final IndexProcessorFig indexProcessorFig;
     private final IndexService indexService;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
+    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final EntityIndexFactory entityIndexFactory;
 
     private final Timer readTimer;
     private final Timer writeTimer;
@@ -97,11 +102,15 @@ public class AmazonAsyncEventService implements AsyncEventService {
                                    final IndexProcessorFig indexProcessorFig,
                                    final MetricsFactory metricsFactory,
                                    final IndexService indexService,
-                                   final EntityCollectionManagerFactory entityCollectionManagerFactory
+                                   final EntityCollectionManagerFactory entityCollectionManagerFactory,
+                                   final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                                   final EntityIndexFactory entityIndexFactory
     ) {
 
         this.indexService = indexService;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
+        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.entityIndexFactory = entityIndexFactory;
 
         final QueueScope queueScope = new QueueScopeImpl(QUEUE_NAME);
         this.queue = queueManagerFactory.getQueueManager(queueScope);
@@ -229,6 +238,10 @@ public class AmazonAsyncEventService implements AsyncEventService {
                         handleEntityIndexUpdate(message);
                         break;
 
+
+                    case APPLICATION_INDEX:
+                        handleInitializeApplicationIndex(message);
+
                     default:
                         logger.error("Unknown EventType: {}", event.getEventType());
 
@@ -242,13 +255,9 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     @Override
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
-        offer( new InitializeApplicationIndexEvent( applicationScope  ) );
+        offer(new InitializeApplicationIndexEvent(applicationScope));
     }
 
-    @Override
-    public void queueInitializeManagementIndex() {
-        offer( new InitializeManagementIndexEvent( ) );
-    }
 
     @Override
     public void queueEntityIndexUpdate(final ApplicationScope applicationScope,
@@ -363,6 +372,20 @@ public class AmazonAsyncEventService implements AsyncEventService {
     }
 
 
+    public void handleInitializeApplicationIndex(final QueueMessage message) {
+        Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex");
+
+        final AsyncEvent event = (AsyncEvent) message.getBody();
+        Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
+        Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
+
+        final ApplicationScope applicationScope = event.getApplicationScope();
+        final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+        final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
+        index.initialize();
+        ack(message);
+    }
+
     /**
      * Loop through and start the workers
      */

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index 1c97c3e..a36a9ae 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -39,7 +39,6 @@ public interface AsyncEventService extends ReIndexAction {
      * @param applicationScope
      */
     void queueInitializeApplicationIndex( final ApplicationScope applicationScope );
-    void queueInitializeManagementIndex( );
 
     /**
      * Queue an entity to be indexed.  This will start processing immediately. For implementations that are realtime (akka, in memory)

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
index f455f9c..0a58369 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncIndexProvider.java
@@ -20,11 +20,13 @@
 package org.apache.usergrid.corepersistence.asyncevents;
 
 
+import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.queue.QueueManagerFactory;
 
 import com.google.inject.Inject;
@@ -46,6 +48,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
     private final RxTaskScheduler rxTaskScheduler;
     private final EntityCollectionManagerFactory entityCollectionManagerFactory;
     private final EventBuilder eventBuilder;
+    private final IndexLocationStrategyFactory indexLocationStrategyFactory;
+    private final EntityIndexFactory entityIndexFactory;
 
     private AsyncEventService asyncEventService;
 
@@ -57,7 +61,9 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
                               final IndexService indexService,
                               final RxTaskScheduler rxTaskScheduler,
                               final EntityCollectionManagerFactory entityCollectionManagerFactory,
-                              final EventBuilder eventBuilder) {
+                              final EventBuilder eventBuilder,
+                              final IndexLocationStrategyFactory indexLocationStrategyFactory,
+                              final EntityIndexFactory entityIndexFactory) {
 
         this.indexProcessorFig = indexProcessorFig;
         this.queueManagerFactory = queueManagerFactory;
@@ -66,6 +72,8 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
         this.rxTaskScheduler = rxTaskScheduler;
         this.entityCollectionManagerFactory = entityCollectionManagerFactory;
         this.eventBuilder = eventBuilder;
+        this.indexLocationStrategyFactory = indexLocationStrategyFactory;
+        this.entityIndexFactory = entityIndexFactory;
     }
 
 
@@ -90,10 +98,10 @@ public class AsyncIndexProvider implements Provider<AsyncEventService> {
                 return new InMemoryAsyncEventService(eventBuilder, rxTaskScheduler, indexProcessorFig.resolveSynchronously());
             case SQS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, rxTaskScheduler);
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
             case SNS:
                 return new AmazonAsyncEventService(queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, rxTaskScheduler);
+                    entityCollectionManagerFactory, indexLocationStrategyFactory,entityIndexFactory );
             default:
                 throw new IllegalArgumentException("Configuration value of " + getErrorValues() + " are allowed");
         }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 2fc0bb2..adb4a90 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -71,12 +71,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
     }
 
     @Override
-    public void queueInitializeManagementIndex() {
-
-    }
-
-
-    @Override
     public void queueEntityIndexUpdate( final ApplicationScope applicationScope, final Entity entity ) {
 
         //process the entity immediately

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index b331b6e..9278e0f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -149,9 +149,7 @@ public class AsyncEvent implements Serializable {
         EDGE_INDEX,
         ENTITY_DELETE,
         ENTITY_INDEX,
-        APPLICATION_INDEX,
-        MANAGEMENT_INDEX;
-        ;
+        APPLICATION_INDEX;
 
 
         public String asString() {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 6612e8b..656d820 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -28,6 +28,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 @JsonDeserialize(as = AsyncEvent.class)
 public class InitializeApplicationIndexEvent extends AsyncEvent {
     public InitializeApplicationIndexEvent() {
+        super(EventType.APPLICATION_INDEX);
     }
     public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
         super(EventType.APPLICATION_INDEX, applicationScope);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
deleted file mode 100644
index 0af249d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeManagementIndexEvent.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  *  contributor license agreements.  The ASF licenses this file to You
- *  * under the Apache License, Version 2.0 (the "License"); you may not
- *  * use this file except in compliance with the License.
- *  * You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.  For additional information regarding
- *  * copyright in this work, please see the NOTICE file in the top level
- *  * directory of this distribution.
- *
- */
-package org.apache.usergrid.corepersistence.asyncevents.model;
-
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-
-/**
- * Event to initialize mgmt index
- */
-@JsonDeserialize(as = AsyncEvent.class)
-public class InitializeManagementIndexEvent extends AsyncEvent{
-    public InitializeManagementIndexEvent(){
-        super(EventType.MANAGEMENT_INDEX);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a63c8175/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
index 02f42b7..9cf896c 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java
@@ -20,6 +20,7 @@
 package org.apache.usergrid.corepersistence.index;
 
 
+import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.junit.Rule;
 import org.junit.runner.RunWith;
 
@@ -71,10 +72,17 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest {
     public RxTaskScheduler rxTaskScheduler;
 
 
+    @Inject
+    public IndexLocationStrategyFactory indexLocationStrategyFactory;
+
+
+    @Inject
+    public EntityIndexFactory entityIndexFactory;
+
     @Override
     protected AsyncEventService getAsyncEventService() {
         return  new AmazonAsyncEventService( queueManagerFactory, indexProcessorFig, metricsFactory, indexService,
-                    entityCollectionManagerFactory, rxTaskScheduler );
+                    entityCollectionManagerFactory, indexLocationStrategyFactory, entityIndexFactory );
     }
 
 


[08/14] incubator-usergrid git commit: adding async events

Posted by to...@apache.org.
adding async events


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/09873cec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/09873cec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/09873cec

Branch: refs/heads/observable-query-fix
Commit: 09873cec76ac5f11cef8e5cf39c870e823693619
Parents: a63c817
Author: Shawn Feldman <sf...@apache.org>
Authored: Mon Jul 13 14:58:01 2015 -0600
Committer: Shawn Feldman <sf...@apache.org>
Committed: Mon Jul 13 14:58:01 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/AmazonAsyncEventService.java    |  11 +-
 .../asyncevents/AsyncEventService.java          |   1 +
 .../asyncevents/model/AsyncEvent.java           |  15 ++-
 .../model/InitializeApplicationIndexEvent.java  |   5 +-
 .../index/ReplicatedIndexLocationStrategy.java  | 103 +++++++++++++++++++
 5 files changed, 124 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
index 50000df..1390058 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java
@@ -29,7 +29,7 @@ import com.codahale.metrics.Histogram;
 import com.google.common.base.Preconditions;
 import org.apache.usergrid.corepersistence.CpEntityManager;
 import org.apache.usergrid.corepersistence.asyncevents.model.*;
-import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory;
+import org.apache.usergrid.corepersistence.index.*;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.persistence.index.EntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
@@ -38,9 +38,6 @@ import org.apache.usergrid.utils.UUIDUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
-import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
-import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
@@ -255,7 +252,8 @@ public class AmazonAsyncEventService implements AsyncEventService {
 
     @Override
     public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) {
-        offer(new InitializeApplicationIndexEvent(applicationScope));
+        IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+        offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy)));
     }
 
 
@@ -379,8 +377,7 @@ public class AmazonAsyncEventService implements AsyncEventService {
         Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex");
         Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType()));
 
-        final ApplicationScope applicationScope = event.getApplicationScope();
-        final IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope);
+        final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy();
         final EntityIndex index = entityIndexFactory.createEntityIndex(indexLocationStrategy);
         index.initialize();
         ack(message);

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
index a36a9ae..1a5e865 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventService.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 import org.apache.usergrid.corepersistence.index.ReIndexAction;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.entities.Application;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.model.entity.Entity;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
index 9278e0f..3fabc1c 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 import org.apache.usergrid.persistence.model.entity.Id;
 
 import java.io.Serializable;
@@ -37,6 +38,9 @@ import java.io.Serializable;
 public class AsyncEvent implements Serializable {
 
     @JsonProperty
+    protected IndexLocationStrategy indexLocationStrategy;
+
+    @JsonProperty
     protected EventType eventType;
 
     @JsonProperty
@@ -75,9 +79,9 @@ public class AsyncEvent implements Serializable {
         this.creationTime = System.currentTimeMillis();
     }
 
-    public AsyncEvent(EventType eventType, ApplicationScope applicationScope) {
+    public AsyncEvent(EventType eventType, IndexLocationStrategy indexLocationStrategy) {
         this.eventType = eventType;
-        this.applicationScope = applicationScope;
+        this.indexLocationStrategy = indexLocationStrategy;
         this.creationTime = System.currentTimeMillis();
     }
 
@@ -132,6 +136,13 @@ public class AsyncEvent implements Serializable {
         this.applicationScope = applicationScope;
     }
 
+    @JsonSerialize
+    public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; }
+
+    public void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){
+        this.indexLocationStrategy = indexLocationStrategy;
+    }
+
     @JsonSerialize()
     public Edge getEdge() {
         return edge;

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
index 656d820..8b20651 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java
@@ -21,6 +21,7 @@ package org.apache.usergrid.corepersistence.asyncevents.model;
 
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
 
 /**
  * event to init app index
@@ -30,8 +31,8 @@ public class InitializeApplicationIndexEvent extends AsyncEvent {
     public InitializeApplicationIndexEvent() {
         super(EventType.APPLICATION_INDEX);
     }
-    public InitializeApplicationIndexEvent(final ApplicationScope applicationScope) {
-        super(EventType.APPLICATION_INDEX, applicationScope);
+    public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) {
+        super(EventType.APPLICATION_INDEX, indexLocationStrategy);
 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/09873cec/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
new file mode 100644
index 0000000..b404a78
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReplicatedIndexLocationStrategy.java
@@ -0,0 +1,103 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  contributor license agreements.  The ASF licenses this file to You
+ *  * under the Apache License, Version 2.0 (the "License"); you may not
+ *  * use this file except in compliance with the License.
+ *  * You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.  For additional information regarding
+ *  * copyright in this work, please see the NOTICE file in the top level
+ *  * directory of this distribution.
+ *
+ */
+package org.apache.usergrid.corepersistence.index;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.index.IndexAlias;
+import org.apache.usergrid.persistence.index.IndexLocationStrategy;
+
+/**
+ * Classy class class.
+ */
+public class ReplicatedIndexLocationStrategy implements IndexLocationStrategy {
+
+    private ReplicatedIndexAlias alias;
+    private String rootName;
+    private String indexInitialName;
+    private ApplicationScope applicationScope;
+    private int numberShards;
+    private int numberReplicas;
+
+    public ReplicatedIndexLocationStrategy(){
+
+    }
+
+    public ReplicatedIndexLocationStrategy(IndexLocationStrategy indexLocationStrategy){
+        alias = new ReplicatedIndexAlias( indexLocationStrategy.getAlias() );
+        rootName = indexLocationStrategy.getIndexRootName();
+        indexInitialName = indexLocationStrategy.getIndexInitialName();
+        applicationScope = indexLocationStrategy.getApplicationScope();
+        numberShards = indexLocationStrategy.getNumberOfShards();
+        numberReplicas = indexLocationStrategy.getNumberOfReplicas();
+    }
+
+    @Override
+    public IndexAlias getAlias() {
+        return alias;
+    }
+
+    @Override
+    public String getIndexRootName() {
+        return rootName;
+    }
+
+    @Override
+    public String getIndexInitialName() {
+        return indexInitialName;
+    }
+
+    @Override
+    public ApplicationScope getApplicationScope() {
+        return applicationScope;
+    }
+
+    @Override
+    public int getNumberOfShards() {
+        return numberShards;
+    }
+
+    @Override
+    public int getNumberOfReplicas() {
+        return numberReplicas;
+    }
+
+    public static class ReplicatedIndexAlias implements IndexAlias{
+
+        private String readAlias;
+        private String writeAlias;
+
+        public ReplicatedIndexAlias(){
+
+        }
+        public ReplicatedIndexAlias(IndexAlias alias){
+            this.readAlias = alias.getReadAlias();
+            this.writeAlias = alias.getWriteAlias();
+        }
+        @Override
+        public String getReadAlias() {
+            return readAlias;
+        }
+
+        @Override
+        public String getWriteAlias() {
+            return writeAlias;
+        }
+    }
+}