You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/21 01:00:31 UTC

[01/10] incubator-usergrid git commit: Update of Apis and tests

Repository: incubator-usergrid
Updated Branches:
  refs/heads/two-dot-o-dev b637535dd -> b28aeee05


Update of Apis and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dbfff997
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dbfff997
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dbfff997

Branch: refs/heads/two-dot-o-dev
Commit: dbfff9970f844f2264113fc05ee290aa3ca0ae50
Parents: 30e0e45
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 11 09:27:28 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 10:29:41 2015 -0700

----------------------------------------------------------------------
 .../corepersistence/CpEntityManagerFactory.java | 14 ------
 .../corepersistence/index/ReIndexAction.java    |  4 ++
 .../corepersistence/index/ReIndexService.java   | 11 ++--
 .../index/ReIndexServiceImpl.java               |  5 +-
 .../persistence/EntityManagerFactory.java       |  2 -
 .../PerformanceEntityRebuildIndexTest.java      | 53 ++++++--------------
 6 files changed, 27 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e796545..13b6433 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -687,20 +687,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
         }
     }
 
-    @Override
-    public ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> appId, Optional<String> collection )   {
-        throw new UnsupportedOperationException( "Implement me" );
-//
-//        EntityManager em = getEntityManager( appId );
-//
-//        //explicitly invoke create index, we don't know if it exists or not in ES during a rebuild.
-//        Application app = em.getApplication();
-//
-//        em.reindexCollection(po, collectionName, reverse);
-//
-//        logger.info("\n\nRebuilt index for application {} id {} collection {}\n",
-//            new Object[]{app.getName(), appId, collectionName});
-    }
 
     @Override
     public void addIndex(final String indexSuffix,final int shards,final int replicas, final String writeConsistency){

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 086b2aa..b878246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -29,5 +29,9 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
 @FunctionalInterface
 public interface ReIndexAction {
 
+    /**
+     * Index this entity with the specified scope
+     * @param entityIdScope
+     */
     void index( final EntityIdScope entityIdScope );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index 91409fe..e594ad3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -44,12 +44,15 @@ public interface ReIndexService {
 
     /**
      * Perform an index rebuild
-     * @param appId
-     * @param collection
+     *
+     * @param appId The applicationId to re-index, or all applications if absent
+     * @param collection The collection name to re-index.  Otherwise all collections in an app will be used.
+     * @param cursor An optional cursor to resume processing
+     * @param startTimestamp The time to start indexing from.  All edges >= this time will be indexed.
      * @return
      */
-    IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> collectionName, final Optional<String> cursor,
-                        final Optional<Long> startTimestamp );
+    IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
+                        final Optional<Long> startTimestamp);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index be5bcab..bd1bff9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -84,8 +84,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
 
     @Override
-    public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection,
-                                       final Optional<String> collectionName, final Optional<String> cursor,
+    public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
                                        final Optional<Long> startTimestamp ) {
 
         //load our last emitted Scope if a cursor is present
@@ -100,7 +99,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         //create an observable that loads each entity and indexes it, start it running with publish
         final ConnectableObservable<EdgeScope> runningReIndex =
-            allEntityIdsObservable.getEdgesToEntities( applicationScopes, collectionName, startTimestamp )
+            allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
 
                 //for each edge, create our scope and index on it
                 .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index b3f4b62..cd7e515 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -174,8 +174,6 @@ public interface EntityManagerFactory {
     /** For testing purposes */
     public void flushEntityManagerCaches();
 
-    ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> appId, Optional<String> collection );
-
     /**
      * Add a new index to the application for scale
      * @param suffix unique indentifier for additional index

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index df2a762..a17c925 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Optional;
 import org.apache.commons.lang.RandomStringUtils;
+
+import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.junit.After;
 import org.junit.Before;
@@ -63,9 +65,8 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
     private static final MetricRegistry registry = new MetricRegistry();
     private Slf4jReporter reporter;
 
-    private static final long RUNTIME_MS = TimeUnit.SECONDS.toMillis( 10 );
+    private static final int ENTITIES_TO_INDEX = 2000;
 
-    private static final long WRITE_DELAY_MS = 10;
 
 
 
@@ -99,6 +100,8 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         final EntityManager em = setup.getEmf().getEntityManager( appId );
 
+        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
+
         // ----------------- create a bunch of entities
 
         Map<String, Object> entityMap = new HashMap<String, Object>() {{
@@ -107,37 +110,18 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
             put("key3", "Some value");
         }};
 
-//        Map<String, Object> cat1map = new HashMap<String, Object>() {{
-//            put("name", "enzo");
-//            put("color", "orange");
-//        }};
-//        Map<String, Object> cat2map = new HashMap<String, Object>() {{
-//            put("name", "marquee");
-//            put("color", "grey");
-//        }};
-//        Map<String, Object> cat3map = new HashMap<String, Object>() {{
-//            put("name", "bertha");
-//            put("color", "tabby");
-//        }};
-//
-//        Entity cat1 = em.create("cat", cat1map );
-//        Entity cat2 = em.create("cat", cat2map );
-//        Entity cat3 = em.create("cat", cat3map );
-
-        final long stopTime = System.currentTimeMillis() + RUNTIME_MS;
 
         List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-        int entityCount = 0;
         int herderCount  = 0;
         int shepardCount = 0;
-        while ( System.currentTimeMillis() < stopTime ) {
+        for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
 
             final Entity entity;
 
             try {
-                entityMap.put("key", entityCount );
+                entityMap.put("key", i );
 
-                if ( entityCount % 2 == 0 ) {
+                if ( i % 2 == 0 ) {
                     entity = em.create("catherder", entityMap);
                     herderCount++;
                 } else {
@@ -156,15 +140,13 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
             }
 
             entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
-            if ( entityCount % 10 == 0 ) {
-                logger.info("Created {} entities", entityCount );
+            if ( i % 10 == 0 ) {
+                logger.info("Created {} entities", i );
             }
 
-            entityCount++;
-            try { Thread.sleep( WRITE_DELAY_MS ); } catch (InterruptedException ignored ) {}
         }
 
-        logger.info("Created {} entities", entityCount);
+        logger.info("Created {} entities", ENTITIES_TO_INDEX);
         app.refreshIndex();
 
         // ----------------- test that we can read them, should work fine
@@ -182,7 +164,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         logger.debug("Reading data, should fail this time ");
         try {
-            readData( em,  "testTypes", entityCount, 0 );
+            readData( em,  "testTypes", ENTITIES_TO_INDEX, 0 );
             fail("should have failed to read data");
 
         } catch (Exception expected) {}
@@ -214,10 +196,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         try {
 
-//            // do it forwards
-            setup.getEmf().rebuildCollectionIndex(Optional.of(em.getApplicationId()), Optional.<String>of("catherders"));
-//
-//            // and backwards, just to make sure both cases are covered
+            reIndexService.rebuildIndex( Optional.of( em.getApplicationId()), Optional.<String>of("catherders"), Optional.absent(), Optional.absent() );
 
             reporter.report();
             registry.remove( meterName );
@@ -269,11 +248,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         Entity cat2 = em.create("cat", cat2map );
         Entity cat3 = em.create("cat", cat3map );
 
-        final long stopTime = System.currentTimeMillis() + RUNTIME_MS;
-
         List<EntityRef> entityRefs = new ArrayList<EntityRef>();
         int entityCount = 0;
-        while ( System.currentTimeMillis() < stopTime ) {
+        for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
 
             final Entity entity;
 
@@ -295,8 +272,6 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
                 logger.info("Created {} entities", entityCount );
             }
 
-            entityCount++;
-            try { Thread.sleep( WRITE_DELAY_MS ); } catch (InterruptedException ignored ) {}
         }
 
         logger.info("Created {} entities", entityCount);


[02/10] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-643

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-643


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b1d9ac28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b1d9ac28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b1d9ac28

Branch: refs/heads/two-dot-o-dev
Commit: b1d9ac283d1bf8ab17a5d9fd305b38447b5799d1
Parents: dbfff99 bc3cafb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 12 10:43:32 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 10:43:32 2015 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CpRelationManager.java    |  4 ++--
 .../java/org/apache/usergrid/persistence/Query.java    | 13 ++++---------
 .../usergrid/services/AbstractCollectionService.java   |  4 ++--
 .../org/apache/usergrid/services/ServiceRequest.java   |  2 +-
 4 files changed, 9 insertions(+), 14 deletions(-)
----------------------------------------------------------------------



[07/10] incubator-usergrid git commit: Updates REST api calls. POST is used to create a re-index job, PUT is used to resume.

Posted by gr...@apache.org.
Updates REST api calls.  POST is used to create a re-index job, PUT is used to resume.

Also fixes transitive dependency issue with clouds


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5a7f9c09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5a7f9c09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5a7f9c09

Branch: refs/heads/two-dot-o-dev
Commit: 5a7f9c096b5d79950b5df658070ae0bd9bdc77da
Parents: 48be894
Author: Todd Nine <tn...@apigee.com>
Authored: Fri May 15 19:24:17 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri May 15 19:24:17 2015 -0600

----------------------------------------------------------------------
 .../index/IndexServiceRequestBuilder.java       |  88 -----
 .../index/IndexServiceRequestBuilderImpl.java   | 122 -------
 .../index/ReIndexRequestBuilder.java            |  86 +++++
 .../index/ReIndexRequestBuilderImpl.java        | 122 +++++++
 .../corepersistence/index/ReIndexService.java   |   6 +-
 .../index/ReIndexServiceImpl.java               |  14 +-
 .../PerformanceEntityRebuildIndexTest.java      |   6 +-
 .../persistence/index/impl/IndexingUtils.java   |   2 +
 stack/pom.xml                                   |  13 +
 .../org/apache/usergrid/rest/IndexResource.java | 342 +++++++++----------
 .../main/resources/usergrid-rest-context.xml    |   3 -
 .../resources/usergrid-rest-deploy-context.xml  |   1 -
 12 files changed, 401 insertions(+), 404 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
deleted file mode 100644
index 07160d8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.UUID;
-
-import org.elasticsearch.action.index.IndexRequestBuilder;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-/**
- * A builder interface to build our re-index request
- */
-public interface IndexServiceRequestBuilder {
-
-    /**
-     * Set the application id
-     */
-    IndexServiceRequestBuilder withApplicationId( final UUID applicationId );
-
-    /**
-     * Set the collection name.  If not set, every collection will be reindexed
-     * @param collectionName
-     * @return
-     */
-    IndexServiceRequestBuilder withCollection( final String collectionName );
-
-    /**
-     * Set our cursor to resume processing
-     * @param cursor
-     * @return
-     */
-    IndexServiceRequestBuilder withCursor(final String cursor);
-
-
-    /**
-     * Set the timestamp to re-index entities updated >= this timestamp
-     * @param timestamp
-     * @return
-     */
-    IndexServiceRequestBuilder withStartTimestamp(final Long timestamp);
-
-
-    /**
-     * Get the application scope
-     * @return
-     */
-    Optional<ApplicationScope> getApplicationScope();
-
-    /**
-     * Get the collection name
-     * @return
-     */
-    Optional<String> getCollectionName();
-
-    /**
-     * Get the cursor
-     * @return
-     */
-    Optional<String> getCursor();
-
-    /**
-     * Get the updated since timestamp
-     * @return
-     */
-    Optional<Long> getUpdateTimestamp();
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
deleted file mode 100644
index 4017b6e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Index service request builder
- */
-public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
-
-    private Optional<UUID> withApplicationId = Optional.absent();
-    private Optional<String> withCollectionName = Optional.absent();
-    private Optional<String> cursor = Optional.absent();
-    private Optional<Long> updateTimestamp = Optional.absent();
-
-
-    /***
-     *
-     * @param applicationId The application id
-     * @return
-     */
-    @Override
-    public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
-        this.withApplicationId = Optional.fromNullable( applicationId );
-        return this;
-    }
-
-
-    /**
-     * the colleciton name
-     * @param collectionName
-     * @return
-     */
-    @Override
-    public IndexServiceRequestBuilder withCollection( final String collectionName ) {
-        if(collectionName == null){
-            this.withCollectionName = Optional.absent();
-        }
-        else {
-            this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
-        }
-        return this;
-    }
-
-
-    /**
-     * The cursor
-     * @param cursor
-     * @return
-     */
-    @Override
-    public IndexServiceRequestBuilder withCursor( final String cursor ) {
-        this.cursor = Optional.fromNullable( cursor );
-        return this;
-    }
-
-
-    /**
-     * Set start timestamp in epoch time.  Only entities updated since this time will be processed for indexing
-     * @param timestamp
-     * @return
-     */
-    @Override
-    public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
-        this.updateTimestamp = Optional.fromNullable( timestamp );
-        return this;
-    }
-
-
-    @Override
-    public Optional<ApplicationScope> getApplicationScope() {
-
-        if ( this.withApplicationId.isPresent() ) {
-            return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
-        }
-
-        return Optional.absent();
-    }
-
-
-    @Override
-    public Optional<String> getCollectionName() {
-        return withCollectionName;
-    }
-
-
-    @Override
-    public Optional<String> getCursor() {
-        return cursor;
-    }
-
-
-    @Override
-    public Optional<Long> getUpdateTimestamp() {
-        return updateTimestamp;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
new file mode 100644
index 0000000..0863a63
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface ReIndexRequestBuilder {
+
+    /**
+     * Set the application id
+     */
+    ReIndexRequestBuilder withApplicationId( final UUID applicationId );
+
+    /**
+     * Set the collection name.  If not set, every collection will be reindexed
+     * @param collectionName
+     * @return
+     */
+    ReIndexRequestBuilder withCollection( final String collectionName );
+
+    /**
+     * Set our cursor to resume processing
+     * @param cursor
+     * @return
+     */
+    ReIndexRequestBuilder withCursor(final String cursor);
+
+
+    /**
+     * Set the timestamp to re-index entities updated >= this timestamp
+     * @param timestamp
+     * @return
+     */
+    ReIndexRequestBuilder withStartTimestamp(final Long timestamp);
+
+
+    /**
+     * Get the application scope
+     * @return
+     */
+    Optional<ApplicationScope> getApplicationScope();
+
+    /**
+     * Get the collection name
+     * @return
+     */
+    Optional<String> getCollectionName();
+
+    /**
+     * Get the cursor
+     * @return
+     */
+    Optional<String> getCursor();
+
+    /**
+     * Get the updated since timestamp
+     * @return
+     */
+    Optional<Long> getUpdateTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
new file mode 100644
index 0000000..25e71e6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Index service request builder
+ */
+public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
+
+    private Optional<UUID> withApplicationId = Optional.absent();
+    private Optional<String> withCollectionName = Optional.absent();
+    private Optional<String> cursor = Optional.absent();
+    private Optional<Long> updateTimestamp = Optional.absent();
+
+
+    /***
+     *
+     * @param applicationId The application id
+     * @return
+     */
+    @Override
+    public ReIndexRequestBuilder withApplicationId( final UUID applicationId ) {
+        this.withApplicationId = Optional.fromNullable( applicationId );
+        return this;
+    }
+
+
+    /**
+     * the colleciton name
+     * @param collectionName
+     * @return
+     */
+    @Override
+    public ReIndexRequestBuilder withCollection( final String collectionName ) {
+        if(collectionName == null){
+            this.withCollectionName = Optional.absent();
+        }
+        else {
+            this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
+        }
+        return this;
+    }
+
+
+    /**
+     * The cursor
+     * @param cursor
+     * @return
+     */
+    @Override
+    public ReIndexRequestBuilder withCursor( final String cursor ) {
+        this.cursor = Optional.fromNullable( cursor );
+        return this;
+    }
+
+
+    /**
+     * Set start timestamp in epoch time.  Only entities updated since this time will be processed for indexing
+     * @param timestamp
+     * @return
+     */
+    @Override
+    public ReIndexRequestBuilder withStartTimestamp( final Long timestamp ) {
+        this.updateTimestamp = Optional.fromNullable( timestamp );
+        return this;
+    }
+
+
+    @Override
+    public Optional<ApplicationScope> getApplicationScope() {
+
+        if ( this.withApplicationId.isPresent() ) {
+            return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+        }
+
+        return Optional.absent();
+    }
+
+
+    @Override
+    public Optional<String> getCollectionName() {
+        return withCollectionName;
+    }
+
+
+    @Override
+    public Optional<String> getCursor() {
+        return cursor;
+    }
+
+
+    @Override
+    public Optional<Long> getUpdateTimestamp() {
+        return updateTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index af3615e..bae8d1f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -29,15 +29,15 @@ public interface ReIndexService {
     /**
      * Perform an index rebuild
      *
-     * @param indexServiceRequestBuilder The builder to build the request
+     * @param reIndexRequestBuilder The builder to build the request
      */
-    ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
+    ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder );
 
 
     /**
      * Generate a build for the index
      */
-    IndexServiceRequestBuilder getBuilder();
+    ReIndexRequestBuilder getBuilder();
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index f44113b..ef03866 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -91,16 +91,16 @@ public class ReIndexServiceImpl implements ReIndexService {
 
 
     @Override
-    public ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
+    public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder ) {
 
         //load our last emitted Scope if a cursor is present
 
-        final Optional<EdgeScope> cursor = parseCursor( indexServiceRequestBuilder.getCursor() );
+        final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() );
 
 
         final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
 
-        final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
+        final Optional<ApplicationScope> appId = reIndexRequestBuilder.getApplicationScope();
 
 
         Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
@@ -111,11 +111,11 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
 
-        final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
+        final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
 
         //create an observable that loads each entity and indexes it, start it running with publish
         final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
-            indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+            reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
 
             //for each edge, create our scope and index on it
             .doOnNext( edge -> {
@@ -143,8 +143,8 @@ public class ReIndexServiceImpl implements ReIndexService {
 
 
     @Override
-    public IndexServiceRequestBuilder getBuilder() {
-        return new IndexServiceRequestBuilderImpl();
+    public ReIndexRequestBuilder getBuilder() {
+        return new ReIndexRequestBuilderImpl();
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 8d54043..06eb060 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -33,7 +33,7 @@ import org.apache.commons.lang.RandomStringUtils;
 
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
@@ -153,7 +153,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         logger.debug( "Preparing to rebuild all indexes" );
 
 
-        final IndexServiceRequestBuilder builder =
+        final ReIndexRequestBuilder builder =
             reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
 
         ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
@@ -270,7 +270,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         try {
 
-            final IndexServiceRequestBuilder builder =
+            final ReIndexRequestBuilder builder =
                 reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
 
             ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 8b248aa..bc15149 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -99,6 +99,8 @@ public class IndexingUtils {
 
     /**
      * Create our sub scope.  This is the ownerUUID + type
+     *
+     * TODO make this format more readable and parsable
      */
     public static String createContextName( final ApplicationScope applicationScope, final SearchEdge scope ) {
         StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 762607d..4559db2 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -1241,6 +1241,13 @@
         <groupId>org.apache.jclouds</groupId>
         <artifactId>jclouds-blobstore</artifactId>
         <version>${jclouds.version}</version>
+          <exclusions>
+              <!-- blows up our version of guice-->
+                          <exclusion>
+                              <groupId>com.google.inject.extensions</groupId>
+                              <artifactId>guice-assistedinject</artifactId>
+                          </exclusion>
+          </exclusions>
       </dependency>
 
       <dependency>
@@ -1262,6 +1269,12 @@
             <groupId>aopalliance</groupId>
             <artifactId>aopalliance</artifactId>
           </exclusion>
+
+            <!-- blows up our version of guice-->
+            <exclusion>
+                <groupId>com.google.inject.extensions</groupId>
+                <artifactId>guice-assistedinject</artifactId>
+            </exclusion>
         </exclusions>
       </dependency>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
index 668c05c..ffb9700 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
@@ -20,25 +20,34 @@
 
 package org.apache.usergrid.rest;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.sun.jersey.api.json.JSONWithPadding;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.index.utils.UUIDUtils;
-import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+
+import java.util.Map;
+import java.util.UUID;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriInfo;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.annotation.Scope;
 import org.springframework.stereotype.Component;
 
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.UriInfo;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.json.JSONWithPadding;
+
 
 /**
  * Classy class class.
@@ -46,260 +55,239 @@ import java.util.UUID;
 @Component
 @Scope( "singleton" )
 @Produces( {
-        MediaType.APPLICATION_JSON, "application/javascript", "application/x-javascript", "text/ecmascript",
-        "application/ecmascript", "text/jscript"
+    MediaType.APPLICATION_JSON, "application/javascript", "application/x-javascript", "text/ecmascript",
+    "application/ecmascript", "text/jscript"
 } )
 public class IndexResource extends AbstractContextResource {
 
-    private static final Logger logger = LoggerFactory.getLogger(IndexResource.class);
+    private static final Logger logger = LoggerFactory.getLogger( IndexResource.class );
+    private static final String UPDATED_FIELD = "updated";
 
-    public IndexResource(){
+
+    public IndexResource() {
         super();
     }
 
+
     @RequireSystemAccess
-    @PUT
+    @POST
     @Path( "rebuild" )
-    public JSONWithPadding rebuildIndexes( @Context UriInfo ui,
-                                           @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
-            throws Exception {
+    public JSONWithPadding rebuildIndexesPost( @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+        throws Exception {
 
-        ApiResponse response = createApiResponse();
-        response.setAction( "rebuild indexes" );
 
+        logger.info( "Rebuilding all applications" );
 
-        final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+        final ReIndexRequestBuilder request = createRequest();
 
+        return executeAndCreateResponse( request, callback );
+    }
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
-                logger.info( "Indexing entity {}:{} ", entity.getType(), entity.getUuid() );
-            }
 
-        };
+    @RequireSystemAccess
+    @PUT
+    @Path( "rebuild" )
+    public JSONWithPadding rebuildIndexesPut( final Map<String, Object> payload,
+                                              @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+        throws Exception {
 
 
-        final Thread rebuild = new Thread() {
+        logger.info( "Resuming rebuilding all applications" );
+        final ReIndexRequestBuilder request = createRequest();
 
-            @Override
-            public void run() {
-                logger.info( "Rebuilding all indexes" );
+        return executeResumeAndCreateResponse( payload, request, callback );
+    }
 
-                try {
-                    emf.rebuildAllIndexes( po );
-                }
-                catch ( Exception e ) {
-                    logger.error( "Unable to rebuild indexes", e );
-                }
 
-                logger.info( "Completed all indexes" );
-            }
-        };
+    @RequireSystemAccess
+    @POST
+    @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH )
+    public JSONWithPadding rebuildIndexesPut( @PathParam( "applicationId" ) String applicationIdStr,
+                                              @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+                                              @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
 
-        rebuild.setName( "Index rebuild all usergrid" );
-        rebuild.setDaemon( true );
-        rebuild.start();
+        throws Exception {
 
 
-        response.setSuccess();
+        logger.info( "Rebuilding application {}", applicationIdStr );
 
-        return new JSONWithPadding( response, callback );
+
+        final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
+
+        final ReIndexRequestBuilder request = createRequest().withApplicationId( appId );
+
+        return executeAndCreateResponse( request, callback );
     }
 
 
     @RequireSystemAccess
     @PUT
     @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH )
-    public JSONWithPadding rebuildIndexes( @Context UriInfo ui, @PathParam( "applicationId" ) String applicationIdStr,
-                                           @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
-                                           @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
-
-            throws Exception {
-
-        final UUID appId = UUIDUtils.tryExtractUUID(applicationIdStr);
-        ApiResponse response = createApiResponse();
-        response.setAction( "rebuild indexes started" );
+    public JSONWithPadding rebuildIndexesPut( final Map<String, Object> payload,
+                                              @PathParam( "applicationId" ) String applicationIdStr,
+                                              @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+                                              @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
 
-        final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+        throws Exception {
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
-                logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
-            }
+        logger.info( "Resuming rebuilding application {}", applicationIdStr );
 
+        final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
 
-        };
-
-
-        final Thread rebuild = new Thread() {
-
-            @Override
-            public void run() {
-
+        final ReIndexRequestBuilder request = createRequest().withApplicationId( appId );
 
-                logger.info( "Started rebuilding application {} in collection ", appId );
+        return executeResumeAndCreateResponse( payload, request, callback );
+    }
 
 
-                try {
-                    emf.rebuildApplicationIndexes( appId, po );
-                }
-                catch ( Exception e ) {
-                    logger.error( "Unable to re-index application", e );
-                }
+    @RequireSystemAccess
+    @POST
+    @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
+    public JSONWithPadding rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr,
+                                               @PathParam( "collectionName" ) final String collectionName,
+                                               @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+                                               @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+        throws Exception {
 
 
-                logger.info( "Completed rebuilding application {} in collection ", appId );
-            }
-        };
+        logger.info( "Rebuilding collection {} in  application {}", collectionName, applicationIdStr );
 
-        rebuild.setName( String.format( "Index rebuild for app %s", appId ) );
-        rebuild.setDaemon( true );
-        rebuild.start();
+        final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
 
-        response.setSuccess();
+        final ReIndexRequestBuilder request =
+            createRequest().withApplicationId( appId ).withCollection( collectionName );
 
-        return new JSONWithPadding( response, callback );
+        return executeAndCreateResponse( request, callback );
     }
 
 
     @RequireSystemAccess
     @PUT
     @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
-    public JSONWithPadding rebuildIndexes(
-        @Context UriInfo ui,
-        @PathParam( "applicationId" ) final String applicationIdStr,
-        @PathParam( "collectionName" ) final String collectionName,
-        @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
-        @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback) throws Exception {
+    public JSONWithPadding rebuildIndexesPut( final Map<String, Object> payload,
+                                              @PathParam( "applicationId" ) final String applicationIdStr,
+                                              @PathParam( "collectionName" ) final String collectionName,
+                                              @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+                                              @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+        throws Exception {
 
-        final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
-        ApiResponse response = createApiResponse();
-        response.setAction( "rebuild indexes" );
+        logger.info( "Resuming rebuilding collection {} in  application {}", collectionName, applicationIdStr );
 
-        final Thread rebuild = new Thread() {
+        final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
 
-            public void run() {
+        final ReIndexRequestBuilder request =
+            createRequest().withApplicationId( appId ).withCollection( collectionName );
 
-                logger.info( "Started rebuilding application {} in collection {}", appId, collectionName );
+        return executeResumeAndCreateResponse( payload, request, callback );
+    }
 
-                try {
-                    rebuildCollection( appId, collectionName, reverse );
-                } catch (Exception e) {
 
-                    // TODO: handle this in rebuildCollection() instead
-                    throw new RuntimeException("Error rebuilding collection");
-                }
+    @RequireSystemAccess
+    @POST
+    @Path( "rebuild/management" )
+    public JSONWithPadding rebuildInternalIndexesPost(
+        @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) throws Exception {
 
-                logger.info( "Completed rebuilding application {} in collection {}", appId, collectionName );
-            }
-        };
 
-        rebuild.setName( String.format( "Index rebuild for app %s and collection %s", appId, collectionName ) );
-        rebuild.setDaemon( true );
-        rebuild.start();
+        final UUID managementAppId = emf.getManagementAppId();
 
-        response.setSuccess();
+        logger.info( "Rebuilding management application with id {} ", managementAppId );
+        final ReIndexRequestBuilder request = createRequest().withApplicationId( managementAppId );
 
-        return new JSONWithPadding( response, callback );
+        return executeAndCreateResponse( request, callback );
     }
 
+
     @RequireSystemAccess
-    @PUT
-    @Path( "rebuildinternal" )
-    public JSONWithPadding rebuildInternalIndexes(
-        @Context UriInfo ui,
-        @PathParam( "applicationId" ) String applicationIdStr,
-        @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
-        @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )  throws Exception {
+    @POST
+    @Path( "rebuild/management" )
+    public JSONWithPadding rebuildInternalIndexesPut( final Map<String, Object> payload,
+                                                      @QueryParam( "callback" ) @DefaultValue( "callback" )
+                                                      String callback ) throws Exception {
 
 
-        final UUID appId = UUIDUtils.tryExtractUUID(applicationIdStr);
-        ApiResponse response = createApiResponse();
-        response.setAction( "rebuild indexes started" );
+        final UUID managementAppId = emf.getManagementAppId();
 
-        final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+        logger.info( "Resuming rebuilding management application with id {} ", managementAppId );
+        final ReIndexRequestBuilder request = createRequest().withApplicationId( managementAppId );
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
-                logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
-            }
+        return executeResumeAndCreateResponse( payload, request, callback );
+    }
 
-        };
 
-        final Thread rebuild = new Thread() {
+    @RequireSystemAccess
+    @POST
+    public JSONWithPadding addIndex( @Context UriInfo ui, Map<String, Object> config,
+                                     @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+        throws Exception {
 
-            @Override
-            public void run() {
+        Preconditions
+            .checkNotNull( config, "Payload for config is null, please pass {replicas:int, shards:int} in body" );
 
-                logger.info( "Started rebuilding internal indexes", appId );
+        ApiResponse response = createApiResponse();
 
-                try {
-                    emf.rebuildInternalIndexes( po );
-                }
-                catch ( Exception e ) {
-                    logger.error( "Unable to re-index internals", e );
-                }
+        if ( !config.containsKey( "replicas" ) || !config.containsKey( "shards" ) ||
+            !( config.get( "replicas" ) instanceof Integer ) || !( config.get( "shards" ) instanceof Integer ) ) {
+            throw new IllegalArgumentException( "body must contains 'replicas' of type int and 'shards' of type int" );
+        }
 
-                logger.info( "Completed rebuilding internal indexes" );
-            }
-        };
+        if ( !config.containsKey( "indexSuffix" ) ) {
+            throw new IllegalArgumentException( "Please add an indexSuffix to your post" );
+        }
 
-        rebuild.setName( String.format( "Index rebuild for app %s", appId ) );
-        rebuild.setDaemon( true );
-        rebuild.start();
 
-        response.setSuccess();
+        emf.addIndex( config.get( "indexSuffix" ).toString(), ( int ) config.get( "shards" ),
+            ( int ) config.get( "replicas" ), ( String ) config.get( "writeConsistency" ) );
+        response.setAction( "Add index to alias" );
 
         return new JSONWithPadding( response, callback );
     }
 
-    @RequireSystemAccess
-    @POST
-    @Path( RootResource.APPLICATION_ID_PATH )
-    public JSONWithPadding addIndex(@Context UriInfo ui,
-            Map<String, Object> config,
-            @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback)  throws Exception{
 
-        Preconditions.checkNotNull(config,"Payload for config is null, please pass {replicas:int, shards:int} in body");
+    private ReIndexService getReIndexService() {
+        return injector.getInstance( ReIndexService.class );
+    }
 
-        ApiResponse response = createApiResponse();
 
-        if (!config.containsKey("replicas") || !config.containsKey("shards") ||
-                !(config.get("replicas") instanceof Integer) || !(config.get("shards") instanceof Integer)){
-            throw new IllegalArgumentException("body must contains 'replicas' of type int and 'shards' of type int");
-        }
+    private ReIndexRequestBuilder createRequest() {
+        return createRequest();
+    }
 
-        if(!config.containsKey("indexSuffix")) {
-            throw new IllegalArgumentException("Please add an indexSuffix to your post");
-        }
 
+    private JSONWithPadding executeResumeAndCreateResponse( final Map<String, Object> payload,
+                                                            final ReIndexRequestBuilder request,
+                                                            final String callback ) {
 
-        emf.addIndex( config.get("indexSuffix").toString(),
-            (int) config.get("shards"),(int) config.get("replicas"),(String)config.get("writeConsistency"));
-        response.setAction("Add index to alias");
+        Preconditions.checkArgument( payload.containsKey( UPDATED_FIELD ),
+            "You must specified the field \"updated\" in the payload" );
 
-        return new JSONWithPadding(response, callback);
+        //add our updated timestamp to the request
+        if ( !payload.containsKey( UPDATED_FIELD ) ) {
+            final long timestamp = ( long ) payload.get( UPDATED_FIELD );
+            request.withStartTimestamp( timestamp );
+        }
 
+        return executeAndCreateResponse( request, callback );
     }
 
-    private void rebuildCollection(
-        final UUID applicationId,
-        final String collectionName,
-        final boolean reverse) throws Exception {
 
-        EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+    /**
+     * Execute the request and return the response.
+     */
+    private JSONWithPadding executeAndCreateResponse( final ReIndexRequestBuilder request, final String callback ) {
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
-                logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
-            }
 
-        };
+        final ReIndexService.ReIndexStatus status = getReIndexService().rebuildIndex( request );
 
-        logger.info( "Reindexing for app id: {} and collection {}", applicationId, collectionName );
+        final ApiResponse response = createApiResponse();
 
-        emf.rebuildCollectionIndex(Optional.of(applicationId),Optional.of(collectionName));
-        getEntityIndex().refreshAsync().toBlocking().first();
-    }
+        response.setAction( "rebuild indexes" );
+        response.setProperty( "jobId", status.getJobId() );
+        response.setProperty( "status", status.getStatus() );
+        response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+        response.setProperty( "numberQueued", status.getNumberProcessed() );
+        response.setSuccess();
 
+        return new JSONWithPadding( response, callback );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/rest/src/main/resources/usergrid-rest-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/resources/usergrid-rest-context.xml b/stack/rest/src/main/resources/usergrid-rest-context.xml
index 5c63e72..16f4541 100644
--- a/stack/rest/src/main/resources/usergrid-rest-context.xml
+++ b/stack/rest/src/main/resources/usergrid-rest-context.xml
@@ -47,9 +47,6 @@
 		<property name="securityManager" ref="securityManager" />
 	</bean>
 
-	<bean id="mongoServer" class="org.apache.usergrid.mongo.MongoServer"
-		init-method="startServer" destroy-method="stopServer" />
-
 	<!-- override the security manager -->
 	<bean id="securityManager" class="org.apache.usergrid.rest.security.shiro.RestSecurityManager">
 		<property name="cacheManager" ref="cacheManager" />

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
index 9965dbc..e9b7ccd 100644
--- a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
+++ b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
@@ -33,7 +33,6 @@
 		<property name="locations">
 			<list>
 				<value>classpath:/usergrid-default.properties</value>
-                <value>${usergrid-custom-spring-properties}</value>
 			</list>
 		</property>
 	</bean>


[09/10] incubator-usergrid git commit: Addresses some of George's comments

Posted by gr...@apache.org.
Addresses some of George's comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/53aa87ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/53aa87ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/53aa87ca

Branch: refs/heads/two-dot-o-dev
Commit: 53aa87cad41b6d7320c24af4a5e71061fe2a4dd3
Parents: 8fde7c5
Author: Todd Nine <tn...@apigee.com>
Authored: Wed May 20 15:04:09 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed May 20 15:04:09 2015 -0600

----------------------------------------------------------------------
 .../usergrid/persistence/PerformanceEntityRebuildIndexTest.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53aa87ca/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 06eb060..318a378 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -314,7 +314,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
                 }
             }
             catch ( IllegalArgumentException iae ) {
-                //swallow
+                //swallow.  Thrown if our job can't be found.  I.E hasn't updated yet
             }
 
 


[08/10] incubator-usergrid git commit: Fixes exception on missing keyspace during bootstrap

Posted by gr...@apache.org.
Fixes exception on missing keyspace during bootstrap


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8fde7c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8fde7c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8fde7c5d

Branch: refs/heads/two-dot-o-dev
Commit: 8fde7c5dfd7d06c17b0a2c2a3ac1053bf3d97e76
Parents: 5a7f9c0
Author: Todd Nine <tn...@apigee.com>
Authored: Fri May 15 19:38:19 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri May 15 19:38:19 2015 -0600

----------------------------------------------------------------------
 .../data/MigrationInfoSerializationImpl.java    |  4 +-
 .../migration/schema/MigrationManagerImpl.java  | 14 ++----
 .../core/migration/util/AstayanxUtils.java      | 49 ++++++++++++++++++++
 3 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8fde7c5d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
index 4a349fd..3def798 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDef
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
 import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
 import org.apache.usergrid.persistence.core.astyanax.StringRowCompositeSerializer;
+import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
@@ -149,7 +150,8 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
             return 0;
         }
         catch ( ConnectionException e ) {
-            throw new DataMigrationException( "Unable to retrieve status", e );
+            AstayanxUtils.isKeyspaceMissing("Unable to connect to cassandra to retrieve status", e);
+            return 0;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8fde7c5d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
index 31aa1b3..26351cf 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.inject.Inject;
@@ -131,22 +132,15 @@ public class MigrationManagerImpl implements MigrationManager {
 
         try {
             keyspaceDefinition = keyspace.describeKeyspace();
-        }
-        catch ( BadRequestException badRequestException ) {
-
-            //check if it's b/c the keyspace is missing, if so
-            final String message = badRequestException.getMessage();
-
-            boolean missingKeyspace = message.contains( "why:Keyspace" ) && message.contains( "does not exist" );
 
-            if ( !missingKeyspace ) {
-                throw badRequestException;
-            }
         }catch( NotFoundException nfe){
             //if we execute this immediately after a drop keyspace in 1.2.x, Cassandra is returning the NFE instead of a BadRequestException
             //swallow and log, then continue to create the keyspaces.
             logger.info( "Received a NotFoundException when attempting to describe keyspace.  It does not exist" );
         }
+        catch(Exception e){
+            AstayanxUtils.isKeyspaceMissing("Unable to connect to cassandra", e);
+        }
 
 
         if ( keyspaceDefinition != null ) {

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8fde7c5d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
new file mode 100644
index 0000000..7ae4748
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.migration.util;
+
+
+import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
+
+
+public class AstayanxUtils {
+
+    /**
+     * Return true if the exception is an instance of a missing keysapce
+     * @param rethrowMessage The message to add to the exception if rethrown
+     * @param cassandraException The exception from cassandar
+     * @return
+     */
+    public static void isKeyspaceMissing(final String rethrowMessage,  final Exception cassandraException ) {
+
+        if ( cassandraException instanceof BadRequestException ) {
+
+            //check if it's b/c the keyspace is missing, if so
+            final String message = cassandraException.getMessage();
+
+            //no op, just swallow
+            if(message.contains( "why:Keyspace" ) && message.contains( "does not exist" )){
+                return;
+            };
+        }
+
+       throw new RuntimeException( rethrowMessage, cassandraException );
+    }
+}


[10/10] incubator-usergrid git commit: Merge remote-tracking branch 'origin/USERGRID-643' into two-dot-o-dev

Posted by gr...@apache.org.
Merge remote-tracking branch 'origin/USERGRID-643' into two-dot-o-dev


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b28aeee0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b28aeee0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b28aeee0

Branch: refs/heads/two-dot-o-dev
Commit: b28aeee0522a3ebef8e2d349676ea381baf3ee37
Parents: b637535 53aa87c
Author: GERey <gr...@apigee.com>
Authored: Wed May 20 16:00:15 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Wed May 20 16:00:15 2015 -0700

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   4 +
 .../corepersistence/CpEntityManagerFactory.java |  14 -
 .../corepersistence/CpRelationManager.java      |   1 -
 .../asyncevents/EventBuilder.java               |   6 +-
 .../asyncevents/EventBuilderImpl.java           |  18 +-
 .../asyncevents/InMemoryAsyncEventService.java  |   7 +-
 .../asyncevents/SQSAsyncEventService.java       |   9 +-
 .../index/EdgeScopeSerializer.java              |  41 +++
 .../index/EntityIndexOperation.java             |  46 +++
 .../index/IndexProcessorFig.java                |   6 +-
 .../corepersistence/index/ReIndexAction.java    |   6 +-
 .../index/ReIndexRequestBuilder.java            |  86 +++++
 .../index/ReIndexRequestBuilderImpl.java        | 122 +++++++
 .../corepersistence/index/ReIndexService.java   |  89 +++--
 .../index/ReIndexServiceImpl.java               | 251 ++++++++++++--
 .../cursor/AbstractCursorSerializer.java        |   2 +-
 .../pipeline/cursor/CursorSerializerUtil.java   |  54 ++-
 .../pipeline/cursor/RequestCursor.java          |   9 +-
 .../pipeline/cursor/ResponseCursor.java         |  49 +--
 .../pipeline/read/AbstractPathFilter.java       |  30 --
 .../pipeline/read/CursorSeek.java               |  53 +++
 .../rx/impl/AllEntityIdsObservable.java         |   5 +-
 .../rx/impl/AllEntityIdsObservableImpl.java     |   6 +-
 .../util/SerializableMapper.java                |  91 -----
 .../persistence/EntityManagerFactory.java       |   2 -
 .../rx/EdgesToTargetObservableIT.java           |   4 +-
 .../PerformanceEntityRebuildIndexTest.java      | 346 ++++++++-----------
 .../data/MigrationInfoSerializationImpl.java    |   4 +-
 .../migration/schema/MigrationManagerImpl.java  |  14 +-
 .../core/migration/util/AstayanxUtils.java      |  49 +++
 .../graph/serialization/EdgesObservable.java    |  24 +-
 .../serialization/impl/EdgesObservableImpl.java |  10 +-
 .../impl/TargetIdObservableImpl.java            |   2 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |   2 +-
 .../persistence/index/impl/IndexingUtils.java   |   2 +
 stack/pom.xml                                   |  13 +
 .../org/apache/usergrid/rest/IndexResource.java | 342 +++++++++---------
 .../main/resources/usergrid-rest-context.xml    |   3 -
 .../resources/usergrid-rest-deploy-context.xml  |   1 -
 39 files changed, 1138 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b28aeee0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b28aeee0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b28aeee0/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------


[04/10] incubator-usergrid git commit: Finishes changes before tests

Posted by gr...@apache.org.
Finishes changes before tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/20c9b350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/20c9b350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/20c9b350

Branch: refs/heads/two-dot-o-dev
Commit: 20c9b3509cf96a6ecab1a45a2c572fd6a041e00d
Parents: cb179d3
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 17:19:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 17:19:27 2015 -0600

----------------------------------------------------------------------
 .../asyncevents/InMemoryAsyncEventService.java  |   2 -
 .../index/EdgeScopeSerializer.java              |  41 ++++
 .../index/IndexProcessorFig.java                |   6 +-
 .../corepersistence/index/ReIndexService.java   |  75 +++---
 .../index/ReIndexServiceImpl.java               | 226 +++++++++++++++----
 .../pipeline/cursor/CursorSerializerUtil.java   |  54 ++++-
 .../pipeline/cursor/RequestCursor.java          |   9 +-
 .../pipeline/cursor/ResponseCursor.java         |  49 ++--
 .../pipeline/read/AbstractPathFilter.java       |  30 ---
 .../pipeline/read/CursorSeek.java               |  53 +++++
 .../rx/impl/AllEntityIdsObservable.java         |   4 +-
 .../rx/impl/AllEntityIdsObservableImpl.java     |   5 +-
 .../PerformanceEntityRebuildIndexTest.java      |   5 +-
 .../graph/serialization/EdgesObservable.java    |  21 +-
 .../serialization/impl/EdgesObservableImpl.java |   4 +-
 15 files changed, 422 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 96966bf..ddcf826 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -99,8 +99,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
     @Override
     public void index( final EntityIndexOperation entityIndexOperation ) {
-
-
         run(eventBuilder.index( entityIndexOperation ));
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
new file mode 100644
index 0000000..2a6a5ac
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+
+
+/**
+ * Serialize our edge scope for cursors
+ */
+public class EdgeScopeSerializer extends AbstractCursorSerializer<EdgeScope> {
+
+
+    public static final EdgeScopeSerializer INSTANCE = new EdgeScopeSerializer();
+
+    @Override
+    protected Class<EdgeScope> getType() {
+        return EdgeScope.class;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index fe9d3fd..8e835e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,9 +78,9 @@ public interface IndexProcessorFig extends GuicyFig {
     String getQueueImplementation();
 
 
-    @Default("30000")
-    @Key("elasticsearch.reindex.sample.interval")
-    long getReIndexSampleInterval();
+    @Default("10000")
+    @Key("elasticsearch.reindex.flush.interval")
+    int getUpdateInterval();
 
 
     @Default("false")

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b25eca5..f8955dd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,22 +20,6 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-import rx.Observer;
-import rx.observables.ConnectableObservable;
-
-
 /**
  * An interface for re-indexing all entities in an application
  */
@@ -46,48 +30,75 @@ public interface ReIndexService {
      * Perform an index rebuild
      *
      * @param indexServiceRequestBuilder The builder to build the request
-     * @return
      */
-    IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
+    IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
 
 
     /**
      * Generate a build for the index
-     * @return
      */
     IndexServiceRequestBuilder getBuilder();
 
+
+    /**
+     * Get the status of a job
+     * @param jobId The jobId returned during the rebuild index
+     * @return
+     */
+    IndexResponse getStatus( final String jobId );
+
+
     /**
      * The response when requesting a re-index operation
      */
     class IndexResponse {
-        final String cursor;
-        final ConnectableObservable<EdgeScope> indexedEdgecount;
+        final String jobId;
+        final String status;
+        final long numberProcessed;
+        final long lastUpdated;
+
+
+        public IndexResponse( final String jobId, final String status, final long numberProcessed,
+                              final long lastUpdated ) {
+            this.jobId = jobId;
+            this.status = status;
+            this.numberProcessed = numberProcessed;
+            this.lastUpdated = lastUpdated;
+        }
 
 
-        public IndexResponse( final String cursor, final ConnectableObservable<EdgeScope> indexedEdgecount ) {
-            this.cursor = cursor;
-            this.indexedEdgecount = indexedEdgecount;
+        /**
+         * Get the jobId used to resume this operation
+         */
+        public String getJobId() {
+            return jobId;
+        }
+
+
+        /**
+         * Get the last updated time, as a long
+         * @return
+         */
+        public long getLastUpdated() {
+            return lastUpdated;
         }
 
 
         /**
-         * Get the cursor used to resume this operation
+         * Get the number of records processed
          * @return
          */
-        public String getCursor() {
-            return cursor;
+        public long getNumberProcessed() {
+            return numberProcessed;
         }
 
 
         /**
-         * Return the observable of all edges to be indexed.
-         *
-         * Note that after subscribing "connect" will need to be called to ensure that processing begins
+         * Get the status
          * @return
          */
-        public ConnectableObservable<EdgeScope> getCount() {
-            return indexedEdgecount;
+        public String getStatus() {
+            return status;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index a2fa09a..d828fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,29 +20,32 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.concurrent.TimeUnit;
+import java.util.List;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
 import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.map.MapManager;
 import org.apache.usergrid.persistence.map.MapManagerFactory;
 import org.apache.usergrid.persistence.map.MapScope;
 import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
 import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
 import rx.Observable;
-import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
 
 
 @Singleton
@@ -51,14 +54,18 @@ public class ReIndexServiceImpl implements ReIndexService {
     private static final MapScope RESUME_MAP_SCOPE =
         new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
 
-    //Keep cursors to resume re-index for 1 day.  This is far beyond it's useful real world implications anyway.
+    //Keep cursors to resume re-index for 10 days.  This is far beyond it's useful real world implications anyway.
     private static final int INDEX_TTL = 60 * 60 * 24 * 10;
 
+    private static final String MAP_CURSOR_KEY = "cursor";
+    private static final String MAP_COUNT_KEY = "count";
+    private static final String MAP_STATUS_KEY = "status";
+    private static final String MAP_UPDATED_KEY = "lastUpdated";
+
 
     private final AllApplicationsObservable allApplicationsObservable;
     private final AllEntityIdsObservable allEntityIdsObservable;
     private final IndexProcessorFig indexProcessorFig;
-    private final RxTaskScheduler rxTaskScheduler;
     private final MapManager mapManager;
     private final AsyncEventService indexService;
 
@@ -66,69 +73,61 @@ public class ReIndexServiceImpl implements ReIndexService {
     @Inject
     public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable,
                                final MapManagerFactory mapManagerFactory,
-                               final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig,
-                               final RxTaskScheduler rxTaskScheduler, final AsyncEventService indexService ) {
+                               final AllApplicationsObservable allApplicationsObservable,
+                               final IndexProcessorFig indexProcessorFig, final AsyncEventService indexService ) {
         this.allEntityIdsObservable = allEntityIdsObservable;
         this.allApplicationsObservable = allApplicationsObservable;
         this.indexProcessorFig = indexProcessorFig;
-        this.rxTaskScheduler = rxTaskScheduler;
         this.indexService = indexService;
 
         this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
     }
 
 
-
-
-
     @Override
     public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
 
-          //load our last emitted Scope if a cursor is present
-        if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
-            throw new UnsupportedOperationException( "Build this" );
-        }
+        //load our last emitted Scope if a cursor is present
 
+        final Optional<EdgeScope> cursor = parseCursor( indexServiceRequestBuilder.getCursor() );
+
+
+        final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
 
         final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
-        final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData();
 
 
+        Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+            "You cannot specify an app id and a cursor.  When resuming with cursor you must omit the appid" );
+
+        final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId );
 
 
-        final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+        final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
 
         final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
 
         //create an observable that loads each entity and indexes it, start it running with publish
-        final ConnectableObservable<EdgeScope> runningReIndex =
-            allEntityIdsObservable.getEdgesToEntities( applicationScopes,
-                indexServiceRequestBuilder.getCollectionName() )
-
-                //for each edge, create our scope and index on it
-                .doOnNext( edge -> indexService.index(
-                    new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
-                        modifiedSince ) ) ).publish();
+        final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+            indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
 
+            //for each edge, create our scope and index on it
+            .doOnNext( edge -> indexService.index(
+                new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+                    modifiedSince ) ) );
 
 
         //start our sampler and state persistence
         //take a sample every sample interval to allow us to resume state with minimal loss
-        runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
-            rxTaskScheduler.getAsyncIOScheduler() )
-            .doOnNext( edge -> {
-
-//                final String serializedState = SerializableMapper.asString( edge );
-//
-//                mapManager.putString( newCursor, serializedState, INDEX_TTL );
-            } ).subscribe();
+        runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
+            //create our flushing collector and flush the edge scopes to it
+            .collect( () -> new FlushingCollector( jobId ),
+                ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() )
+                //subscribe on our I/O scheduler and run the task
+            .subscribeOn( Schedulers.io() ).subscribe();
 
 
-        //start pushing to both
-        runningReIndex.connect();
-
-
-        return new IndexResponse( newCursor, runningReIndex );
+        return new IndexResponse( jobId, "Started", 0, 0 );
     }
 
 
@@ -136,6 +135,155 @@ public class ReIndexServiceImpl implements ReIndexService {
     public IndexServiceRequestBuilder getBuilder() {
         return new IndexServiceRequestBuilderImpl();
     }
+
+
+    @Override
+    public IndexResponse getStatus( final String jobId ) {
+        Preconditions.checkNotNull( jobId, "jobId must not be null" );
+        return getIndexResponse( jobId );
+    }
+
+
+    /**
+     * Simple collector that counts state, then flushed every time a buffer is provided.  Writes final state when complete
+     */
+    private class FlushingCollector {
+
+        private final String jobId;
+        private long count;
+
+
+        private FlushingCollector( final String jobId ) {
+            this.jobId = jobId;
+        }
+
+
+        public void flushBuffer( final List<EdgeScope> buffer ) {
+            count += buffer.size();
+
+            //write our cursor state
+            if ( buffer.size() > 0 ) {
+                writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
+            }
+
+            writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+        }
+
+        public void complete(){
+            writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+        }
+    }
+
+
+    /**
+     * Get the resume edge scope
+     *
+     * @param edgeScope The optional edge scope from the cursor
+     */
+    private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) {
+
+
+        if ( edgeScope.isPresent() ) {
+            return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+        }
+
+        return new CursorSeek<>( Optional.absent() );
+    }
+
+
+    /**
+     * Generate an observable for our appliation scope
+     */
+    private Observable<ApplicationScope> getApplications( final Optional<EdgeScope> cursor,
+                                                          final Optional<ApplicationScope> appId ) {
+        //cursor is present use it and skip until we hit that app
+        if ( cursor.isPresent() ) {
+
+            final EdgeScope cursorValue = cursor.get();
+            //we have a cursor and an application scope that was used.
+            return allApplicationsObservable.getData().skipWhile(
+                applicationScope -> !cursorValue.getApplicationScope().equals( applicationScope ) );
+        }
+        //this is intentional.  If
+        else if ( appId.isPresent() ) {
+            return Observable.just( appId.get() );
+        }
+
+        return allApplicationsObservable.getData();
+    }
+
+
+    /**
+     * Swap our cursor for an optional edgescope
+     */
+    private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) {
+
+        if ( !cursor.isPresent() ) {
+            return Optional.absent();
+        }
+
+        //get our cursor
+        final String persistedCursor = mapManager.getString( cursor.get() );
+
+        if ( persistedCursor == null ) {
+            return Optional.absent();
+        }
+
+        final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+        final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() );
+
+        return Optional.of( edgeScope );
+    }
+
+
+    /**
+     * Write the cursor state to the map in cassandra
+     */
+    private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+        final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge );
+
+        final String serializedState = CursorSerializerUtil.asString( node );
+
+        mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, INDEX_TTL );
+    }
+
+
+    /**
+     * Write our state meta data into cassandra so everyone can see it
+     * @param jobId
+     * @param status
+     * @param processedCount
+     * @param lastUpdated
+     */
+    private void writeStateMeta( final String jobId, final String status, final long processedCount,
+                                 final long lastUpdated ) {
+
+        mapManager.putString( jobId + MAP_STATUS_KEY, status );
+        mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+        mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+    }
+
+
+    /**
+     * Get the index response from the jobId
+     * @param jobId
+     * @return
+     */
+    private IndexResponse getIndexResponse( final String jobId ) {
+
+        final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+        if(status == null){
+            throw new IllegalArgumentException( "Could not find a job with id " + jobId );
+        }
+
+        final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+        final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+
+        return new IndexResponse( jobId, status, processedCount, lastUpdated );
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
index fea0364..7acdd00 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
@@ -20,10 +20,15 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import com.fasterxml.jackson.core.Base64Variant;
-import com.fasterxml.jackson.core.Base64Variants;
+import java.io.IOException;
+import java.util.Base64;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Preconditions;
 
 
 /**
@@ -35,9 +40,54 @@ public class CursorSerializerUtil {
 
     private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
 
+    /**
+     * Aritrary number, just meant to keep us from having a DOS issue
+     */
+    private static final int MAX_SIZE = 1024;
+
 
     public static ObjectMapper getMapper() {
         return MAPPER;
     }
 
+
+    /**
+     * Turn the json node in to a base64 encoded SMILE binary
+     */
+    public static String asString( final JsonNode node ) {
+        final byte[] output;
+        try {
+            output = MAPPER.writeValueAsBytes( node );
+        }
+        catch ( JsonProcessingException e ) {
+            throw new RuntimeException( "Unable to create output from json node " + node );
+        }
+
+        //generate a base64 url save string
+        final String value = Base64.getUrlEncoder().encodeToString( output );
+
+        return value;
+    }
+
+
+    /**
+     * Parse the base64 encoded binary string
+     */
+    public static JsonNode fromString( final String base64EncodedJson ) {
+
+        Preconditions.checkArgument( base64EncodedJson.length() <= MAX_SIZE,
+            "Your cursor must be less than " + MAX_SIZE + " chars in length" );
+
+        final byte[] data = Base64.getUrlDecoder().decode( base64EncodedJson );
+
+        JsonNode jsonNode;
+        try {
+            jsonNode =  MAPPER.readTree( data );
+        }
+        catch ( IOException e ) {
+            throw new RuntimeException( "Unable to parse json node from string " + base64EncodedJson );
+        }
+
+        return jsonNode;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
index 870edbb..dc6ae71 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
@@ -37,10 +37,6 @@ import com.google.common.base.Preconditions;
  */
 public class RequestCursor {
 
-    /**
-     * Aritrary number, just meant to keep us from having a DOS issue
-     */
-    private static final int MAX_SIZE = 1024;
 
     private static final int MAX_CURSOR_COUNT = 100;
 
@@ -83,11 +79,8 @@ public class RequestCursor {
         try {
 
 
-            Preconditions.checkArgument( cursor.length() <= MAX_SIZE, "Your cursor must be less than " + MAX_SIZE + " chars in length");
-
-            final byte[] data = Base64.getUrlDecoder().decode( cursor );
 
-            JsonNode jsonNode = MAPPER.readTree( data );
+            JsonNode jsonNode = CursorSerializerUtil.fromString( cursor );
 
 
             Preconditions

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index dbd8b88..dc4bf39 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,11 +20,8 @@
 package org.apache.usergrid.corepersistence.pipeline.cursor;
 
 
-import java.util.Base64;
-
 import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -53,8 +50,8 @@ public class ResponseCursor {
 
 
     /**
-     * Lazyily encoded deliberately.  If the user doesn't care about a cursor and is using streams, we dont' want to take the
-     * time to calculate it
+     * Lazyily encoded deliberately.  If the user doesn't care about a cursor and is using streams, we dont' want to
+     * take the time to calculate it
      */
     public Optional<String> encodeAsString() {
 
@@ -68,42 +65,34 @@ public class ResponseCursor {
             return encodedValue;
         }
 
+        //no edge path, short circuit
 
-        try {
-
-            //no edge path, short circuit
-
-            final ObjectNode map = MAPPER.createObjectNode();
+        final ObjectNode map = MAPPER.createObjectNode();
 
 
-            Optional<EdgePath> current = edgePath;
+        Optional<EdgePath> current = edgePath;
 
 
-            //traverse each edge and add them to our json
-            do {
+        //traverse each edge and add them to our json
+        do {
 
-                final EdgePath edgePath = current.get();
-                final Object cursorValue = edgePath.getCursorValue();
-                final CursorSerializer serializer = edgePath.getSerializer();
-                final int filterId = edgePath.getFilterId();
+            final EdgePath edgePath = current.get();
+            final Object cursorValue = edgePath.getCursorValue();
+            final CursorSerializer serializer = edgePath.getSerializer();
+            final int filterId = edgePath.getFilterId();
 
-                final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
-                map.put( String.valueOf( filterId ), serialized );
+            final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+            map.put( String.valueOf( filterId ), serialized );
 
-                current = current.get().getPrevious();
-            }
-            while ( current.isPresent() );
+            current = current.get().getPrevious();
+        }
+        while ( current.isPresent() );
 
-            final byte[] output = MAPPER.writeValueAsBytes( map );
+        //generate a base64 url save string
+        final String value = CursorSerializerUtil.asString( map );
 
-            //generate a base64 url save string
-            final String value = Base64.getUrlEncoder().encodeToString( output );
+        encodedValue = Optional.of( value );
 
-            encodedValue =  Optional.of( value );
-        }
-        catch ( JsonProcessingException e ) {
-            throw new CursorParseException( "Unable to serialize cursor", e );
-        }
 
         return encodedValue;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
index c68dc4a..0f9ac9b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -76,34 +76,4 @@ public abstract class AbstractPathFilter<T, R, C extends Serializable> extends A
      * Return the class to be used when parsing the cursor
      */
     protected abstract CursorSerializer<C> getCursorSerializer();
-
-
-    /**
-     * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
-     * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
-     */
-    protected static class CursorSeek<C> {
-
-        private Optional<C> seek;
-
-        private CursorSeek(final Optional<C> cursorValue){
-            seek = cursorValue;
-        }
-
-
-        /**
-         * Get the seek value to use when searching
-         * @return
-         */
-        public Optional<C> getSeekValue(){
-            final Optional<C> toReturn = seek;
-
-            seek = Optional.absent();
-
-            return toReturn;
-        }
-
-
-
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
new file mode 100644
index 0000000..b803658
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * An internal class that holds a mutable state.  When resuming, we only ever honor the seek value on the first call.  Afterwards, we will seek from the beginning on newly emitted values.
+ * Calling get will return the first value to seek, or absent if not specified.  Subsequent calls will return absent.  Callers should treat the results as seek values for each operation
+ */
+public class CursorSeek<C> {
+
+    private Optional<C> seek;
+
+    public CursorSeek( final Optional<C> cursorValue ){
+        seek = cursorValue;
+    }
+
+
+    /**
+     * Get the seek value to use when searching
+     * @return
+     */
+    public Optional<C> getSeekValue(){
+        final Optional<C> toReturn = seek;
+
+        seek = Optional.absent();
+
+        return toReturn;
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index aada240..9070609 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -24,6 +24,7 @@ import  com.google.common.base.Optional;
 
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
 
 import rx.Observable;
 
@@ -44,8 +45,9 @@ public interface AllEntityIdsObservable {
      * Get all edges that represent edges to entities in the system
      * @param appScopes
      * @param edgeType The edge type to use (if specified)
+     * @param lastEdge The edge to resume processing from
      * @return
      */
-    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType);
+    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 6a95e7b..0420a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -28,6 +28,7 @@ import com.google.inject.Singleton;
 
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
 import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
@@ -81,12 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
 
 
     @Override
-    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) {
+    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
 
         return appScopes.flatMap( applicationScope -> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
-            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType )
+            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
                                   .map( edge -> new EdgeScope(applicationScope, edge ));
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index a17c925..cb9919f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Optional;
 import org.apache.commons.lang.RandomStringUtils;
 
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
 import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.junit.After;
@@ -196,7 +197,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         try {
 
-            reIndexService.rebuildIndex( Optional.of( em.getApplicationId()), Optional.<String>of("catherders"), Optional.absent(), Optional.absent() );
+            final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
+
+            reIndexService.rebuildIndex(builder );
 
             reporter.report();
             registry.remove( meterName );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 964e13d..78a1d4b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -42,16 +42,6 @@ public interface EdgesObservable {
 
 
     /**
-     * Return an observable of all edges from a source node.  Ordered ascending, from the startTimestamp if specified
-     * @param gm
-     * @param sourceNode
-     * @param edgeType The edge type if specified.  Otherwise all types will be used
-     * @return
-     */
-    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
-                                                final Optional<String> edgeType );
-
-    /**
      * Get all edges from the source node with the target type
      * @param gm
      * @param sourceNode
@@ -67,4 +57,15 @@ public interface EdgesObservable {
      * @return
      */
     Observable<Edge> edgesToTarget(final GraphManager gm,  final Id targetNode);
+
+    /**
+     * Return an observable of all edges from a source node.  Ordered ascending, from the startTimestamp if specified
+     * @param gm
+     * @param sourceNode
+     * @param edgeType The edge type if specified.  Otherwise all types will be used
+     * @param resume The edge to start seeking after.  Otherwise starts at the most recent
+     * @return
+     */
+    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+                                                final Optional<String> edgeType, final Optional<Edge> resume );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 7240798..18274ac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -72,7 +72,7 @@ public class EdgesObservableImpl implements EdgesObservable {
 
     @Override
     public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
-                                                       final Optional<String> edgeTypeInput ) {
+                                                       final Optional<String> edgeTypeInput, final Optional<Edge> resume  ) {
 
 
 
@@ -86,7 +86,7 @@ public class EdgesObservableImpl implements EdgesObservable {
 
                 return gm.loadEdgesFromSource(
                     new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
-                        Optional.<Edge>absent() ) );
+                       resume ) );
         } );
     }
 


[03/10] incubator-usergrid git commit: Refactored observable methods to correct name

Posted by gr...@apache.org.
Refactored observable methods to correct name

Added timestamp to message so that consumers can filter values for faster processing


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cb179d35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cb179d35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cb179d35

Branch: refs/heads/two-dot-o-dev
Commit: cb179d3512952203d20c00773789df23e27f147d
Parents: b1d9ac2
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 12 17:40:20 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 17:40:20 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/EventBuilder.java               |   6 +-
 .../asyncevents/EventBuilderImpl.java           |  18 ++-
 .../asyncevents/InMemoryAsyncEventService.java  |   5 +-
 .../asyncevents/SQSAsyncEventService.java       |   9 +-
 .../index/EntityIndexOperation.java             |  46 +++++++
 .../index/IndexServiceRequestBuilder.java       |  88 +++++++++++++
 .../index/IndexServiceRequestBuilderImpl.java   | 130 +++++++++++++++++++
 .../corepersistence/index/ReIndexAction.java    |   2 +-
 .../corepersistence/index/ReIndexService.java   |  13 +-
 .../index/ReIndexServiceImpl.java               |  44 ++++---
 .../rx/impl/AllEntityIdsObservable.java         |   3 +-
 .../rx/impl/AllEntityIdsObservableImpl.java     |   5 +-
 .../util/SerializableMapper.java                |  91 -------------
 .../rx/EdgesToTargetObservableIT.java           |   4 +-
 .../graph/serialization/EdgesObservable.java    |   7 +-
 .../serialization/impl/EdgesObservableImpl.java |   8 +-
 .../impl/TargetIdObservableImpl.java            |   2 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |   2 +-
 18 files changed, 342 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f48451c..f9f157e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -22,8 +22,8 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
@@ -72,10 +72,10 @@ public interface EventBuilder {
 
     /**
      * Re-index an entity in the scope provided
-     * @param entityIdScope
+     * @param entityIndexOperation
      * @return
      */
-    Observable<IndexOperationMessage> index( EntityIdScope entityIdScope );
+    Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
 
     /**
      * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index c0d82d2..d35ed6d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -25,12 +25,13 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
@@ -38,6 +39,7 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -140,14 +142,20 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) {
+    public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
 
-        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+        final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
 
-        final Id entityId = entityIdScope.getId();
+        final Id entityId = entityIndexOperation.getId();
 
         //load the entity
-        return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
+        return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter(
+            entity -> {
+                final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+
+                //only re-index if it has been updated and been updated after our timestamp
+                return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
+            } )
             //perform indexing on the task scheduler and start it
             .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6faa695..96966bf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -97,10 +98,10 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
 
     @Override
-    public void index( final EntityIdScope entityIdScope ) {
+    public void index( final EntityIndexOperation entityIndexOperation ) {
 
 
-        run(eventBuilder.index( entityIdScope ));
+        run(eventBuilder.index( entityIndexOperation ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
index 415e5e8..1dbfd4e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.exception.NotImplementedException;
@@ -184,7 +185,7 @@ public class SQSAsyncEventService implements AsyncEventService {
     }
 
 
-    @Override
+//    @Override
     public void index( final EntityIdScope entityIdScope ) {
         //queue the re-inex operation
         offer( entityIdScope );
@@ -346,4 +347,10 @@ public class SQSAsyncEventService implements AsyncEventService {
             subscriptions.add( subscription );
         }
     }
+
+
+    @Override
+    public void index( final EntityIndexOperation entityIdScope ) {
+        throw new NotImplementedException( "Implement me" );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
new file mode 100644
index 0000000..3548bbe
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The operation for re-indexing an entity.  The entity should be updated
+ * with an updated timestamp > updatedSince.
+ */
+public class EntityIndexOperation extends EntityIdScope {
+
+    private final long updatedSince;
+
+
+    public EntityIndexOperation( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
+        super( applicationScope, id );
+        this.updatedSince = updatedSince;
+    }
+
+
+    public long getUpdatedSince() {
+        return updatedSince;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
new file mode 100644
index 0000000..07160d8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface IndexServiceRequestBuilder {
+
+    /**
+     * Set the application id
+     */
+    IndexServiceRequestBuilder withApplicationId( final UUID applicationId );
+
+    /**
+     * Set the collection name.  If not set, every collection will be reindexed
+     * @param collectionName
+     * @return
+     */
+    IndexServiceRequestBuilder withCollection( final String collectionName );
+
+    /**
+     * Set our cursor to resume processing
+     * @param cursor
+     * @return
+     */
+    IndexServiceRequestBuilder withCursor(final String cursor);
+
+
+    /**
+     * Set the timestamp to re-index entities updated >= this timestamp
+     * @param timestamp
+     * @return
+     */
+    IndexServiceRequestBuilder withStartTimestamp(final Long timestamp);
+
+
+    /**
+     * Get the application scope
+     * @return
+     */
+    Optional<ApplicationScope> getApplicationScope();
+
+    /**
+     * Get the collection name
+     * @return
+     */
+    Optional<String> getCollectionName();
+
+    /**
+     * Get the cursor
+     * @return
+     */
+    Optional<String> getCursor();
+
+    /**
+     * Get the updated since timestamp
+     * @return
+     */
+    Optional<Long> getUpdateTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
new file mode 100644
index 0000000..3466674
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
+
+
+    /**
+     *
+     final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+
+     final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+     //create an observable that loads each entity and indexes it, start it running with publish
+     final ConnectableObservable<EdgeScope> runningReIndex =
+         allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+
+             //for each edge, create our scope and index on it
+             .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+
+
+
+     //start our sampler and state persistence
+     //take a sample every sample interval to allow us to resume state with minimal loss
+     runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
+         rxTaskScheduler.getAsyncIOScheduler() )
+         .doOnNext( edge -> {
+
+             final String serializedState = SerializableMapper.asString( edge );
+
+             mapManager.putString( newCursor, serializedState, INDEX_TTL );
+         } ).subscribe();
+
+
+     */
+
+    private Optional<UUID> withApplicationId;
+    private Optional<String> withCollectionName;
+    private Optional<String> cursor;
+    private Optional<Long> updateTimestamp;
+
+
+    /***
+     *
+     * @param applicationId
+     * @return
+     */
+    @Override
+    public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
+        this.withApplicationId = Optional.fromNullable(applicationId);
+        return this;
+    }
+
+
+    @Override
+    public IndexServiceRequestBuilder withCollection( final String collectionName ) {
+        this.withCollectionName = Optional.fromNullable( collectionName );
+        return this;
+    }
+
+
+    @Override
+    public IndexServiceRequestBuilder withCursor( final String cursor ) {
+        this.cursor = Optional.fromNullable( cursor );
+        return this;
+    }
+
+
+    @Override
+    public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
+        this.updateTimestamp = Optional.fromNullable(timestamp  );
+        return this;
+    }
+
+
+    @Override
+    public Optional<ApplicationScope> getApplicationScope() {
+
+        if(this.withApplicationId.isPresent()){
+            return Optional.of(  CpNamingUtils.getApplicationScope( withApplicationId.get()));
+        }
+
+        return Optional.absent();
+    }
+
+
+    @Override
+    public Optional<String> getCollectionName() {
+        return withCollectionName;
+    }
+
+
+    @Override
+    public Optional<String> getCursor() {
+        return cursor;
+    }
+
+
+    @Override
+    public Optional<Long> getUpdateTimestamp() {
+        return updateTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index b878246..672b3c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -33,5 +33,5 @@ public interface ReIndexAction {
      * Index this entity with the specified scope
      * @param entityIdScope
      */
-    void index( final EntityIdScope entityIdScope );
+    void index( final EntityIndexOperation entityIdScope );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index e594ad3..b25eca5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -45,16 +45,17 @@ public interface ReIndexService {
     /**
      * Perform an index rebuild
      *
-     * @param appId The applicationId to re-index, or all applications if absent
-     * @param collection The collection name to re-index.  Otherwise all collections in an app will be used.
-     * @param cursor An optional cursor to resume processing
-     * @param startTimestamp The time to start indexing from.  All edges >= this time will be indexed.
+     * @param indexServiceRequestBuilder The builder to build the request
      * @return
      */
-    IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
-                        final Optional<Long> startTimestamp);
+    IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
 
 
+    /**
+     * Generate a build for the index
+     * @return
+     */
+    IndexServiceRequestBuilder getBuilder();
 
     /**
      * The response when requesting a re-index operation

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index bd1bff9..a2fa09a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
@@ -28,7 +27,6 @@ import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.SerializableMapper;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -46,13 +44,11 @@ import com.google.inject.Singleton;
 import rx.Observable;
 import rx.observables.ConnectableObservable;
 
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
 
 @Singleton
 public class ReIndexServiceImpl implements ReIndexService {
 
-    private static final MapScope RESUME_MAP_SCOPTE =
+    private static final MapScope RESUME_MAP_SCOPE =
         new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
 
     //Keep cursors to resume re-index for 1 day.  This is far beyond it's useful real world implications anyway.
@@ -78,31 +74,41 @@ public class ReIndexServiceImpl implements ReIndexService {
         this.rxTaskScheduler = rxTaskScheduler;
         this.indexService = indexService;
 
-        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE );
+        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
     }
 
 
 
+
+
     @Override
-    public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
-                                       final Optional<Long> startTimestamp ) {
+    public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
 
-        //load our last emitted Scope if a cursor is present
-        if ( cursor.isPresent() ) {
+          //load our last emitted Scope if a cursor is present
+        if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
             throw new UnsupportedOperationException( "Build this" );
         }
 
 
-        final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+        final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
+        final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData();
+
+
+
 
         final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
 
+        final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
+
         //create an observable that loads each entity and indexes it, start it running with publish
         final ConnectableObservable<EdgeScope> runningReIndex =
-            allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+            allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+                indexServiceRequestBuilder.getCollectionName() )
 
                 //for each edge, create our scope and index on it
-                .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+                .doOnNext( edge -> indexService.index(
+                    new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+                        modifiedSince ) ) ).publish();
 
 
 
@@ -112,9 +118,9 @@ public class ReIndexServiceImpl implements ReIndexService {
             rxTaskScheduler.getAsyncIOScheduler() )
             .doOnNext( edge -> {
 
-                final String serializedState = SerializableMapper.asString( edge );
-
-                mapManager.putString( newCursor, serializedState, INDEX_TTL );
+//                final String serializedState = SerializableMapper.asString( edge );
+//
+//                mapManager.putString( newCursor, serializedState, INDEX_TTL );
             } ).subscribe();
 
 
@@ -124,6 +130,12 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         return new IndexResponse( newCursor, runningReIndex );
     }
+
+
+    @Override
+    public IndexServiceRequestBuilder getBuilder() {
+        return new IndexServiceRequestBuilderImpl();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index b9e5373..aada240 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -44,9 +44,8 @@ public interface AllEntityIdsObservable {
      * Get all edges that represent edges to entities in the system
      * @param appScopes
      * @param edgeType The edge type to use (if specified)
-     * @param startTime The time to start with
      * @return
      */
-    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime);
+    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 257fab1..6a95e7b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -81,12 +81,13 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
 
 
     @Override
-    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType,  final Optional<Long> startTime) {
+    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) {
 
         return appScopes.flatMap( applicationScope -> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
-            return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(),edgeType,  startTime ).map( edge -> new EdgeScope(applicationScope, edge ));
+            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType )
+                                  .map( edge -> new EdgeScope(applicationScope, edge ));
         } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
deleted file mode 100644
index 19ecf6d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.util;
-
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-
-
-/**
- * A simple utility for serializing serializable classes to/and from strings.  To be used for small object storage only, such as resume on re-index
- * storing data such as entities should be specialized.
- */
-public class SerializableMapper {
-
-    private static final SmileFactory SMILE_FACTORY = new SmileFactory();
-
-    private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
-
-    static{
-        MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
-        SMILE_FACTORY.delegateToTextual( true );
-    }
-
-    /**
-     * Get value as a string
-     * @param toSerialize
-     * @param <T>
-     * @return
-     */
-    public static <T extends Serializable> String asString(final T toSerialize){
-        try {
-            return MAPPER.writeValueAsString( toSerialize );
-        }
-        catch ( JsonProcessingException e ) {
-            throw new RuntimeException( "Unable to process json", e );
-        }
-    }
-
-
-    /**
-     * Write the value as a string
-     * @param <T>
-     * @param serialized
-     * @param clazz
-     * @return
-     */
-    public static <T extends Serializable> T fromString(final String serialized, final Class<T> clazz){
-        Preconditions.checkNotNull(serialized, "serialized string cannot be null");
-
-
-        InputStream stream = new ByteArrayInputStream(serialized.getBytes( StandardCharsets.UTF_8));
-
-        try {
-            return MAPPER.readValue( stream, clazz );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( String.format("Unable to parse string '%s'", serialized), e );
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 92f2b01..9e84219 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSourceAscending( gm, applicationId ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 
@@ -118,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSourceAscending( gm, source ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 9f0bd60..964e13d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -38,7 +38,7 @@ public interface EdgesObservable {
      * @param sourceNode
      * @return
      */
-    Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode );
+    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
 
 
     /**
@@ -46,11 +46,10 @@ public interface EdgesObservable {
      * @param gm
      * @param sourceNode
      * @param edgeType The edge type if specified.  Otherwise all types will be used
-     * @param startTimestamp The start timestamp if specfiied, otherwise Long.MIN will be used
      * @return
      */
-    Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode,final Optional<String> edgeType,
-                                               final Optional<Long> startTimestamp );
+    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+                                                final Optional<String> edgeType );
 
     /**
      * Get all edges from the source node with the target type

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index df9e094..7240798 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -55,7 +55,7 @@ public class EdgesObservableImpl implements EdgesObservable {
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ) {
+    public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
         final Observable<String> edgeTypes =
             gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
 
@@ -71,8 +71,8 @@ public class EdgesObservableImpl implements EdgesObservable {
 
 
     @Override
-    public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, final Optional<String> edgeTypeInput,
-                                                      final Optional<Long> startTimestamp ) {
+    public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+                                                       final Optional<String> edgeTypeInput ) {
 
 
 
@@ -85,7 +85,7 @@ public class EdgesObservableImpl implements EdgesObservable {
                 logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
 
                 return gm.loadEdgesFromSource(
-                    new SimpleSearchByEdgeType( sourceNode, edgeType, startTimestamp.or( Long.MIN_VALUE ), SearchByEdgeType.Order.ASCENDING,
+                    new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                         Optional.<Edge>absent() ) );
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 5cf5117..82c7d54 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -55,7 +55,7 @@ public class TargetIdObservableImpl implements TargetIdObservable {
     public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
 
         //only search edge types that start with collections
-        return edgesFromSourceObservable.edgesFromSourceAscending( gm, sourceNode ).map( new Func1<Edge, Id>() {
+        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( new Func1<Edge, Id>() {
 
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 0df26ff..d6c42e3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -87,7 +87,7 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
 
             //get edges from the source
-            return edgesFromSourceObservable.edgesFromSourceAscending( gm, graphNode.entryNode ).buffer( 1000 )
+            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
                                             .doOnNext( edges -> {
                                                     final MutationBatch batch = keyspace.prepareMutationBatch();
 


[05/10] incubator-usergrid git commit: Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-643

Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-643


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a767bb61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a767bb61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a767bb61

Branch: refs/heads/two-dot-o-dev
Commit: a767bb61e68525006202a447717e6592f0dc0c8e
Parents: 20c9b35 5b1dfa1
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 17:19:41 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 17:19:41 2015 -0600

----------------------------------------------------------------------
 .../corepersistence/CpEntityManager.java        | 22 ++---
 .../corepersistence/CpEntityManagerFactory.java |  8 +-
 .../corepersistence/CpRelationManager.java      | 90 ++++++++++----------
 .../corepersistence/index/IndexServiceImpl.java |  6 +-
 .../usergrid/persistence/EntityManager.java     | 34 ++++----
 .../persistence/MultiQueryIterator.java         |  4 +-
 .../apache/usergrid/persistence/PathQuery.java  |  3 +-
 .../usergrid/persistence/RelationManager.java   | 12 +--
 .../usergrid/persistence/CollectionIT.java      |  2 +-
 .../usergrid/persistence/CountingMutatorIT.java |  6 +-
 .../persistence/EntityConnectionsIT.java        | 26 +++---
 .../org/apache/usergrid/persistence/GeoIT.java  |  8 +-
 .../PerformanceEntityRebuildIndexTest.java      |  8 +-
 .../persistence/query/ConnectionHelper.java     |  2 +-
 .../persistence/query/IteratingQueryIT.java     |  2 +-
 .../resources/usergrid-custom-test.properties   |  2 +
 .../persistence/index/impl/IndexingUtils.java   |  2 +-
 .../org/apache/usergrid/rest/ApiResponse.java   | 16 ++++
 .../apache/usergrid/rest/ApiResponseTest.java   | 45 ++++++++++
 .../rest/management/ImportResourceIT.java       |  2 +-
 .../cassandra/ManagementServiceImpl.java        | 23 ++---
 .../management/export/ExportServiceImpl.java    |  4 +-
 .../management/importer/FileImportJob.java      |  5 +-
 .../management/importer/ImportServiceImpl.java  | 15 ++--
 .../services/AbstractConnectionsService.java    | 24 +++---
 .../users/activities/ActivitiesService.java     |  4 +-
 .../users/activities/ActivitiesService.java     |  4 +-
 .../apache/usergrid/management/EmailFlowIT.java |  9 +-
 .../management/importer/ImportCollectionIT.java |  2 +-
 .../importer/ImportConnectionsTest.java         |  9 +-
 .../management/importer/ImportServiceIT.java    |  6 +-
 31 files changed, 224 insertions(+), 181 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a767bb61/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a767bb61/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------


[06/10] incubator-usergrid git commit: Fixes bugs and cleans up tests

Posted by gr...@apache.org.
Fixes bugs and cleans up tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/48be894f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/48be894f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/48be894f

Branch: refs/heads/two-dot-o-dev
Commit: 48be894fb8af857f33700fcc2717357936d12913
Parents: a767bb6
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 19:23:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 19:23:04 2015 -0600

----------------------------------------------------------------------
 .../usergrid/corepersistence/CoreModule.java    |   4 +
 .../corepersistence/CpRelationManager.java      |   1 -
 .../index/IndexProcessorFig.java                |   2 +-
 .../index/IndexServiceRequestBuilderImpl.java   |  74 ++---
 .../corepersistence/index/ReIndexService.java   |  16 +-
 .../index/ReIndexServiceImpl.java               |  48 ++-
 .../cursor/AbstractCursorSerializer.java        |   2 +-
 .../PerformanceEntityRebuildIndexTest.java      | 318 +++++++++----------
 8 files changed, 225 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a02bffd..b22a7cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -25,6 +25,8 @@ import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
 import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin;
 import org.apache.usergrid.corepersistence.migration.CoreMigration;
 import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
@@ -142,6 +144,8 @@ public class CoreModule  extends AbstractModule {
         bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
 
 
+        bind( ReIndexService.class).to( ReIndexServiceImpl.class );
+
 
         install( new GuicyFigModule( IndexProcessorFig.class ) );
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 4993d88..cba1a07 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -661,7 +661,6 @@ public class CpRelationManager implements RelationManager {
         }while (!found && length <= maxLength);
         if(logger.isInfoEnabled()){
             logger.info(String.format("Consistent Search finished in %s,  results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
-            Thread.dumpStack();
         }
         return results;
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 8e835e2..e4b2329 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,7 +78,7 @@ public interface IndexProcessorFig extends GuicyFig {
     String getQueueImplementation();
 
 
-    @Default("10000")
+    @Default("1000")
     @Key("elasticsearch.reindex.flush.interval")
     int getUpdateInterval();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
index 3466674..4017b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -22,70 +22,57 @@ package org.apache.usergrid.corepersistence.index;
 
 import java.util.UUID;
 
-
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 
 import com.google.common.base.Optional;
 
 
+/**
+ * Index service request builder
+ */
 public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
 
-
-    /**
-     *
-     final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
-
-     final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
-
-     //create an observable that loads each entity and indexes it, start it running with publish
-     final ConnectableObservable<EdgeScope> runningReIndex =
-         allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
-
-             //for each edge, create our scope and index on it
-             .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
-
-
-
-     //start our sampler and state persistence
-     //take a sample every sample interval to allow us to resume state with minimal loss
-     runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
-         rxTaskScheduler.getAsyncIOScheduler() )
-         .doOnNext( edge -> {
-
-             final String serializedState = SerializableMapper.asString( edge );
-
-             mapManager.putString( newCursor, serializedState, INDEX_TTL );
-         } ).subscribe();
-
-
-     */
-
-    private Optional<UUID> withApplicationId;
-    private Optional<String> withCollectionName;
-    private Optional<String> cursor;
-    private Optional<Long> updateTimestamp;
+    private Optional<UUID> withApplicationId = Optional.absent();
+    private Optional<String> withCollectionName = Optional.absent();
+    private Optional<String> cursor = Optional.absent();
+    private Optional<Long> updateTimestamp = Optional.absent();
 
 
     /***
      *
-     * @param applicationId
+     * @param applicationId The application id
      * @return
      */
     @Override
     public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
-        this.withApplicationId = Optional.fromNullable(applicationId);
+        this.withApplicationId = Optional.fromNullable( applicationId );
         return this;
     }
 
 
+    /**
+     * the colleciton name
+     * @param collectionName
+     * @return
+     */
     @Override
     public IndexServiceRequestBuilder withCollection( final String collectionName ) {
-        this.withCollectionName = Optional.fromNullable( collectionName );
+        if(collectionName == null){
+            this.withCollectionName = Optional.absent();
+        }
+        else {
+            this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
+        }
         return this;
     }
 
 
+    /**
+     * The cursor
+     * @param cursor
+     * @return
+     */
     @Override
     public IndexServiceRequestBuilder withCursor( final String cursor ) {
         this.cursor = Optional.fromNullable( cursor );
@@ -93,9 +80,14 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
     }
 
 
+    /**
+     * Set start timestamp in epoch time.  Only entities updated since this time will be processed for indexing
+     * @param timestamp
+     * @return
+     */
     @Override
     public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
-        this.updateTimestamp = Optional.fromNullable(timestamp  );
+        this.updateTimestamp = Optional.fromNullable( timestamp );
         return this;
     }
 
@@ -103,8 +95,8 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
     @Override
     public Optional<ApplicationScope> getApplicationScope() {
 
-        if(this.withApplicationId.isPresent()){
-            return Optional.of(  CpNamingUtils.getApplicationScope( withApplicationId.get()));
+        if ( this.withApplicationId.isPresent() ) {
+            return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
         }
 
         return Optional.absent();

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index f8955dd..af3615e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -31,7 +31,7 @@ public interface ReIndexService {
      *
      * @param indexServiceRequestBuilder The builder to build the request
      */
-    IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
+    ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
 
 
     /**
@@ -45,20 +45,20 @@ public interface ReIndexService {
      * @param jobId The jobId returned during the rebuild index
      * @return
      */
-    IndexResponse getStatus( final String jobId );
+    ReIndexStatus getStatus( final String jobId );
 
 
     /**
      * The response when requesting a re-index operation
      */
-    class IndexResponse {
+    class ReIndexStatus {
         final String jobId;
-        final String status;
+        final Status status;
         final long numberProcessed;
         final long lastUpdated;
 
 
-        public IndexResponse( final String jobId, final String status, final long numberProcessed,
+        public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
                               final long lastUpdated ) {
             this.jobId = jobId;
             this.status = status;
@@ -97,8 +97,12 @@ public interface ReIndexService {
          * Get the status
          * @return
          */
-        public String getStatus() {
+        public Status getStatus() {
             return status;
         }
     }
+
+    enum Status{
+        STARTED, INPROGRESS, COMPLETE;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index d828fc2..f44113b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -22,6 +22,10 @@ package org.apache.usergrid.corepersistence.index;
 
 import java.util.List;
 
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
 import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
 import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
@@ -51,6 +55,8 @@ import rx.schedulers.Schedulers;
 @Singleton
 public class ReIndexServiceImpl implements ReIndexService {
 
+    private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
+
     private static final MapScope RESUME_MAP_SCOPE =
         new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
 
@@ -85,7 +91,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
 
     @Override
-    public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
+    public ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
 
         //load our last emitted Scope if a cursor is present
 
@@ -97,7 +103,7 @@ public class ReIndexServiceImpl implements ReIndexService {
         final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
 
 
-        Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+        Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
             "You cannot specify an app id and a cursor.  When resuming with cursor you must omit the appid" );
 
         final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId );
@@ -112,9 +118,14 @@ public class ReIndexServiceImpl implements ReIndexService {
             indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
 
             //for each edge, create our scope and index on it
-            .doOnNext( edge -> indexService.index(
-                new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
-                    modifiedSince ) ) );
+            .doOnNext( edge -> {
+                final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), modifiedSince );
+
+                logger.info( "Queueing {}", entityIndexOperation );
+
+                indexService.index(entityIndexOperation);
+
+            } );
 
 
         //start our sampler and state persistence
@@ -127,7 +138,7 @@ public class ReIndexServiceImpl implements ReIndexService {
             .subscribeOn( Schedulers.io() ).subscribe();
 
 
-        return new IndexResponse( jobId, "Started", 0, 0 );
+        return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
     }
 
 
@@ -138,7 +149,7 @@ public class ReIndexServiceImpl implements ReIndexService {
 
 
     @Override
-    public IndexResponse getStatus( final String jobId ) {
+    public ReIndexStatus getStatus( final String jobId ) {
         Preconditions.checkNotNull( jobId, "jobId must not be null" );
         return getIndexResponse( jobId );
     }
@@ -166,11 +177,11 @@ public class ReIndexServiceImpl implements ReIndexService {
                 writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
             }
 
-            writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+            writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
         }
 
         public void complete(){
-            writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+            writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
         }
     }
 
@@ -257,10 +268,15 @@ public class ReIndexServiceImpl implements ReIndexService {
      * @param processedCount
      * @param lastUpdated
      */
-    private void writeStateMeta( final String jobId, final String status, final long processedCount,
+    private void writeStateMeta( final String jobId, final Status status, final long processedCount,
                                  final long lastUpdated ) {
 
-        mapManager.putString( jobId + MAP_STATUS_KEY, status );
+        if(logger.isDebugEnabled()) {
+            logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+                new Object[] { jobId, status, processedCount, lastUpdated } );
+        }
+
+        mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
         mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
         mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
     }
@@ -271,18 +287,20 @@ public class ReIndexServiceImpl implements ReIndexService {
      * @param jobId
      * @return
      */
-    private IndexResponse getIndexResponse( final String jobId ) {
+    private ReIndexStatus getIndexResponse( final String jobId ) {
 
-        final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+        final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
 
-        if(status == null){
+        if(stringStatus == null){
             throw new IllegalArgumentException( "Could not find a job with id " + jobId );
         }
 
+        final Status status = Status.valueOf( stringStatus );
+
         final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
         final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
 
-        return new IndexResponse( jobId, status, processedCount, lastUpdated );
+        return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
index 23bb99a..e770a77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
@@ -44,7 +44,7 @@ public abstract class AbstractCursorSerializer<T> implements CursorSerializer<T>
         try {
             final Class<? extends T> classType = getType();
 
-            return objectMapper.treeToValue( node, classType );
+             return objectMapper.treeToValue( node, classType );
         }
         catch ( JsonProcessingException e ) {
             throw new CursorParseException( "Unable to deserialize value", e );

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 55c4846..8d54043 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -22,78 +22,67 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 
-import com.google.common.base.Optional;
-import org.apache.commons.lang.RandomStringUtils;
-
-import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang.RandomStringUtils;
+
 import org.apache.usergrid.AbstractCoreIT;
 import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
 import org.apache.usergrid.persistence.index.EntityIndexFactory;
 import org.apache.usergrid.persistence.model.entity.Id;
 import org.apache.usergrid.persistence.model.entity.SimpleId;
 
-import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Slf4jReporter;
 import com.google.inject.Injector;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 
 //@RunWith(JukitoRunner.class)
 //@UseModules({ GuiceModule.class })
 
+
 public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
-    private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
+    private static final Logger logger = LoggerFactory.getLogger( PerformanceEntityRebuildIndexTest.class );
 
     private static final MetricRegistry registry = new MetricRegistry();
-    private Slf4jReporter reporter;
-
-    private static final int ENTITIES_TO_INDEX = 2000;
 
 
+    private static final int ENTITIES_TO_INDEX = 1000;
 
 
     @Before
     public void startReporting() {
 
-        logger.debug("Starting metrics reporting");
-        reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger )
-                .convertRatesTo( TimeUnit.SECONDS )
-                .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
-
-        reporter.start( 10, TimeUnit.SECONDS );
+        logger.debug( "Starting metrics reporting" );
     }
 
 
     @After
     public void printReport() {
-        logger.debug("Printing metrics report");
-        reporter.report();
-        reporter.stop();
+        logger.debug( "Printing metrics report" );
     }
 
 
-    @Test
+    @Test( timeout = 120000 )
     public void rebuildOneCollectionIndex() throws Exception {
 
-        logger.info("Started rebuildIndex()");
+        logger.info( "Started rebuildIndex()" );
 
-        String rand = RandomStringUtils.randomAlphanumeric(5);
-        final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
 
         final EntityManager em = setup.getEmf().getEntityManager( appId );
 
@@ -102,109 +91,80 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
         // ----------------- create a bunch of entities
 
         Map<String, Object> entityMap = new HashMap<String, Object>() {{
-            put("key1", 1000 );
-            put("key2", 2000 );
-            put("key3", "Some value");
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
         }};
 
 
         List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-        int herderCount  = 0;
+        int herderCount = 0;
         int shepardCount = 0;
-        for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
 
             final Entity entity;
 
             try {
-                entityMap.put("key", i );
+                entityMap.put( "key", i );
 
                 if ( i % 2 == 0 ) {
-                    entity = em.create("catherder", entityMap);
+                    entity = em.create( "catherder", entityMap );
                     herderCount++;
-                } else {
-                    entity = em.create("catshepard", entityMap);
+                }
+                else {
+                    entity = em.create( "catshepard", entityMap );
                     shepardCount++;
                 }
-
-                app.refreshIndex();
-
-//                em.createConnection(entity, "herds", cat1);
-//                em.createConnection(entity, "herds", cat2);
-//                em.createConnection(entity, "herds", cat3);
-
-            } catch (Exception ex) {
-                throw new RuntimeException("Error creating entity", ex);
+            }
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
             }
 
-            entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
             if ( i % 10 == 0 ) {
-                logger.info("Created {} entities", i );
+                logger.info( "Created {} entities", i );
             }
-
         }
 
-        logger.info("Created {} entities", ENTITIES_TO_INDEX);
+        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
         app.refreshIndex();
 
         // ----------------- test that we can read them, should work fine
 
-        logger.debug("Read the data");
-        readData( em, "catherders", herderCount, 0);
-        readData( em, "catshepards", shepardCount, 0);
+        logger.debug( "Read the data" );
+        readData( em, "catherders", herderCount, 0 );
+        readData( em, "catshepards", shepardCount, 0 );
 
         // ----------------- delete the system and application indexes
 
-        logger.debug("Deleting apps");
+        logger.debug( "Deleting apps" );
         deleteIndex( em.getApplicationId() );
 
         // ----------------- test that we can read them, should fail
 
-        logger.debug("Reading data, should fail this time ");
-        try {
-            readData( em,  "testTypes", ENTITIES_TO_INDEX, 0 );
-            fail("should have failed to read data");
+        logger.debug( "Reading data, should fail this time " );
 
-        } catch (Exception expected) {}
+        //should be no data
+        readData( em, "testTypes", 0, 0 );
 
-//        ----------------- rebuild index for catherders only
 
-        logger.debug("Preparing to rebuild all indexes");;
+        //        ----------------- rebuild index for catherders only
 
-        final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
-        final Meter meter = registry.meter( meterName );
+        logger.debug( "Preparing to rebuild all indexes" );
 
-        EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
-            int counter = 0;
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
-
-                meter.mark();
-                logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
-                if ( counter % 100 == 0 ) {
-                    logger.info("Reindexed {} entities", counter );
-                }
-                counter++;
-            }
+        final IndexServiceRequestBuilder builder =
+            reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
 
+        ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
 
+        assertNotNull( status.getJobId(), "JobId is present" );
 
-        };
-
-        try {
+        logger.info( "Rebuilt index" );
 
-            final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
 
-            reIndexService.rebuildIndex(builder );
+        waitForRebuild( status, reIndexService );
 
-            reporter.report();
-            registry.remove( meterName );
-            logger.info("Rebuilt index");
-
-        } catch (Exception ex) {
-            logger.error("Error rebuilding index", ex);
-            fail();
-        }
 
         // ----------------- test that we can read the catherder collection and not the catshepard
 
@@ -213,78 +173,79 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
     }
 
 
-    @Test
+    @Test( timeout = 120000 )
     public void rebuildIndex() throws Exception {
 
-        logger.info("Started rebuildIndex()");
+        logger.info( "Started rebuildIndex()" );
+
+        String rand = RandomStringUtils.randomAlphanumeric( 5 );
+        final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
 
-        String rand = RandomStringUtils.randomAlphanumeric(5);
-        final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+        final EntityManager em = setup.getEmf().getEntityManager( appId );
 
-        final EntityManager em = setup.getEmf().getEntityManager(appId);
+        final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
 
         // ----------------- create a bunch of entities
 
         Map<String, Object> entityMap = new HashMap<String, Object>() {{
-            put("key1", 1000 );
-            put("key2", 2000 );
-            put("key3", "Some value");
+            put( "key1", 1000 );
+            put( "key2", 2000 );
+            put( "key3", "Some value" );
         }};
         Map<String, Object> cat1map = new HashMap<String, Object>() {{
-            put("name", "enzo");
-            put("color", "orange");
+            put( "name", "enzo" );
+            put( "color", "orange" );
         }};
         Map<String, Object> cat2map = new HashMap<String, Object>() {{
-            put("name", "marquee");
-            put("color", "grey");
+            put( "name", "marquee" );
+            put( "color", "grey" );
         }};
         Map<String, Object> cat3map = new HashMap<String, Object>() {{
-            put("name", "bertha");
-            put("color", "tabby");
+            put( "name", "bertha" );
+            put( "color", "tabby" );
         }};
 
-        Entity cat1 = em.create("cat", cat1map );
-        Entity cat2 = em.create("cat", cat2map );
-        Entity cat3 = em.create("cat", cat3map );
+        Entity cat1 = em.create( "cat", cat1map );
+        Entity cat2 = em.create( "cat", cat2map );
+        Entity cat3 = em.create( "cat", cat3map );
 
-        List<EntityRef> entityRefs = new ArrayList<EntityRef>();
-        int entityCount = 0;
-        for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+        List<EntityRef> entityRefs = new ArrayList<>();
+
+        for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
 
             final Entity entity;
 
             try {
-                entityMap.put("key", entityCount );
-                entity = em.create("testType", entityMap );
+                entityMap.put( "key", i );
+                entity = em.create( "testType", entityMap );
 
 
-                em.createConnection(entity, "herds", cat1);
-                em.createConnection(entity, "herds", cat2);
-                em.createConnection(entity, "herds", cat3);
-
-            } catch (Exception ex) {
-                throw new RuntimeException("Error creating entity", ex);
+                em.createConnection( entity, "herds", cat1 );
+                em.createConnection( entity, "herds", cat2 );
+                em.createConnection( entity, "herds", cat3 );
             }
-
-            entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
-            if ( entityCount % 10 == 0 ) {
-                logger.info("Created {} entities", entityCount );
+            catch ( Exception ex ) {
+                throw new RuntimeException( "Error creating entity", ex );
             }
 
+            entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+            if ( i % 10 == 0 ) {
+                logger.info( "Created {} entities", i );
+            }
         }
 
-        logger.info("Created {} entities", entityCount);
+        logger.info( "Created {} entities", ENTITIES_TO_INDEX );
         app.refreshIndex();
-        Thread.sleep(10000);
 
         // ----------------- test that we can read them, should work fine
 
-        logger.debug("Read the data");
-        readData( em, "testType", entityCount, 3 );
+        logger.debug( "Read the data" );
+        final String collectionName = "testtypes";
+        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
 
         // ----------------- delete the system and application indexes
 
-        logger.debug("Deleting app index");
+        logger.debug( "Deleting app index" );
 
         deleteIndex( em.getApplicationId() );
 
@@ -295,61 +256,73 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         // ----------------- test that we can read them, should fail
 
-        logger.debug("Reading data, should fail this time ");
-        try {
-            readData( em, "testTypes", entityCount, 3 );
-            fail("should have failed to read data");
+        logger.debug( "Reading data, should fail this time " );
+
+        readData( em, collectionName, 0, 0 );
+
 
-        } catch (Exception expected) {}
 
         // ----------------- rebuild index
 
-        logger.debug("Preparing to rebuild all indexes");;
+        logger.debug( "Preparing to rebuild all indexes" );
+        ;
 
-        final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
-        final Meter meter = registry.meter( meterName );
 
-        EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
-            int counter = 0;
+        try {
 
-            @Override
-            public void onProgress( final EntityRef entity ) {
+            final IndexServiceRequestBuilder builder =
+                reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
 
-                meter.mark();
-                logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
-                if ( counter % 100 == 0 ) {
-                    logger.info("Reindexed {} entities", counter );
-                }
-                counter++;
-            }
+            ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
 
+            assertNotNull( status.getJobId(), "JobId is present" );
 
-        };
+            logger.info( "Rebuilt index" );
 
-        try {
 
-            fail( "Implement index rebuild" );
-//            setup.getEmf().rebuildInternalIndexes( po );
-//
-//            setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), po );
+            waitForRebuild( status, reIndexService );
 
-            reporter.report();
-            registry.remove( meterName );
-            logger.info("Rebuilt index");
 
-            app.refreshIndex();
+            logger.info( "Rebuilt index" );
 
-        } catch (Exception ex) {
-            logger.error("Error rebuilding index", ex);
+            app.refreshIndex();
+        }
+        catch ( Exception ex ) {
+            logger.error( "Error rebuilding index", ex );
             fail();
         }
 
         // ----------------- test that we can read them
 
-        Thread.sleep(2000);
-        readData( em, "testTypes", entityCount, 3 );
+        Thread.sleep( 2000 );
+        readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
+    }
+
+
+    /**
+     * Wait for the rebuild to occur
+     */
+    private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
+        throws InterruptedException {
+        while ( true ) {
+
+            try {
+                final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
+
+                if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
+                    break;
+                }
+            }
+            catch ( IllegalArgumentException iae ) {
+                //swallow
+            }
+
+
+            Thread.sleep( 1000 );
+        }
     }
 
+
     /**
      * Delete app index
      */
@@ -360,54 +333,49 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
 
         Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
         ApplicationScope scope = new ApplicationScopeImpl( appId );
-        ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
+        ApplicationEntityIndex ei = eif.createApplicationEntityIndex( scope );
 
-        ei.deleteApplication().toBlocking().lastOrDefault(null);
+        ei.deleteApplication().toBlocking().lastOrDefault( null );
         app.refreshIndex();
-
     }
 
 
-    private int readData( EntityManager em,
-        String collectionName, int expectedEntities, int expectedConnections ) throws Exception {
+    private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
+        throws Exception {
 
         app.refreshIndex();
 
-        Query q = Query.fromQL("select * where key1=1000");
-        q.setLimit(40);
-        Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q,expectedEntities );
+        Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+        Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
 
         int count = 0;
         while ( true ) {
 
             for ( Entity e : results.getEntities() ) {
 
-                assertEquals( 2000, e.getProperty("key2"));
+                assertEquals( 2000, e.getProperty( "key2" ) );
 
-                Results catResults = em.searchTargetEntities(e,
-                    Query.fromQL("select *").setConnectionType("herds"));
+                Results catResults =
+                    em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) );
                 assertEquals( expectedConnections, catResults.size() );
 
                 if ( count % 100 == 0 ) {
-                    logger.info( "read {} entities", count);
+                    logger.info( "read {} entities", count );
                 }
                 count++;
             }
 
             if ( results.hasCursor() ) {
-                logger.info( "Counted {} : query again with cursor", count);
+                logger.info( "Counted {} : query again with cursor", count );
                 q.setCursor( results.getCursor() );
                 results = em.searchCollection( em.getApplicationRef(), collectionName, q );
-
-            } else {
+            }
+            else {
                 break;
             }
         }
 
-        if ( expectedEntities != -1 && expectedEntities != count ) {
-            throw new RuntimeException("Did not get expected "
-                + expectedEntities + " entities, instead got " + count );
-        }
+        assertEquals("Did not get expected entities", expectedEntities, count);
         return count;
     }
 }