You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/26 20:13:03 UTC

[04/22] incubator-usergrid git commit: Refactored observable methods to correct name

Refactored observable methods to correct name

Added timestamp to message so that consumers can filter values for faster processing


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cb179d35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cb179d35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cb179d35

Branch: refs/heads/USERGRID-608
Commit: cb179d3512952203d20c00773789df23e27f147d
Parents: b1d9ac2
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 12 17:40:20 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 17:40:20 2015 -0700

----------------------------------------------------------------------
 .../asyncevents/EventBuilder.java               |   6 +-
 .../asyncevents/EventBuilderImpl.java           |  18 ++-
 .../asyncevents/InMemoryAsyncEventService.java  |   5 +-
 .../asyncevents/SQSAsyncEventService.java       |   9 +-
 .../index/EntityIndexOperation.java             |  46 +++++++
 .../index/IndexServiceRequestBuilder.java       |  88 +++++++++++++
 .../index/IndexServiceRequestBuilderImpl.java   | 130 +++++++++++++++++++
 .../corepersistence/index/ReIndexAction.java    |   2 +-
 .../corepersistence/index/ReIndexService.java   |  13 +-
 .../index/ReIndexServiceImpl.java               |  44 ++++---
 .../rx/impl/AllEntityIdsObservable.java         |   3 +-
 .../rx/impl/AllEntityIdsObservableImpl.java     |   5 +-
 .../util/SerializableMapper.java                |  91 -------------
 .../rx/EdgesToTargetObservableIT.java           |   4 +-
 .../graph/serialization/EdgesObservable.java    |   7 +-
 .../serialization/impl/EdgesObservableImpl.java |   8 +-
 .../impl/TargetIdObservableImpl.java            |   2 +-
 .../impl/migration/EdgeDataMigrationImpl.java   |   2 +-
 18 files changed, 342 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f48451c..f9f157e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -22,8 +22,8 @@ package org.apache.usergrid.corepersistence.asyncevents;
 
 import java.util.List;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
@@ -72,10 +72,10 @@ public interface EventBuilder {
 
     /**
      * Re-index an entity in the scope provided
-     * @param entityIdScope
+     * @param entityIndexOperation
      * @return
      */
-    Observable<IndexOperationMessage> index( EntityIdScope entityIdScope );
+    Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
 
     /**
      * A bean to hold both our observables so the caller can choose the subscription mechanism.  Note that

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index c0d82d2..d35ed6d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -25,12 +25,13 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.Schema;
 import org.apache.usergrid.persistence.collection.EntityCollectionManager;
 import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
 import org.apache.usergrid.persistence.collection.MvccLogEntry;
 import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.apache.usergrid.persistence.graph.GraphManager;
@@ -38,6 +39,7 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory;
 import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
 import org.apache.usergrid.persistence.model.entity.Entity;
 import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -140,14 +142,20 @@ public class EventBuilderImpl implements EventBuilder {
 
 
     @Override
-    public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) {
+    public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
 
-        final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+        final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
 
-        final Id entityId = entityIdScope.getId();
+        final Id entityId = entityIndexOperation.getId();
 
         //load the entity
-        return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
+        return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter(
+            entity -> {
+                final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+
+                //only re-index if it has been updated and been updated after our timestamp
+                return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
+            } )
             //perform indexing on the task scheduler and start it
             .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6faa695..96966bf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -97,10 +98,10 @@ public class InMemoryAsyncEventService implements AsyncEventService {
 
 
     @Override
-    public void index( final EntityIdScope entityIdScope ) {
+    public void index( final EntityIndexOperation entityIndexOperation ) {
 
 
-        run(eventBuilder.index( entityIdScope ));
+        run(eventBuilder.index( entityIndexOperation ));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
index 415e5e8..1dbfd4e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
 import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
 import org.apache.usergrid.corepersistence.index.IndexService;
 import org.apache.usergrid.exception.NotImplementedException;
@@ -184,7 +185,7 @@ public class SQSAsyncEventService implements AsyncEventService {
     }
 
 
-    @Override
+//    @Override
     public void index( final EntityIdScope entityIdScope ) {
         //queue the re-inex operation
         offer( entityIdScope );
@@ -346,4 +347,10 @@ public class SQSAsyncEventService implements AsyncEventService {
             subscriptions.add( subscription );
         }
     }
+
+
+    @Override
+    public void index( final EntityIndexOperation entityIdScope ) {
+        throw new NotImplementedException( "Implement me" );
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
new file mode 100644
index 0000000..3548bbe
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The operation for re-indexing an entity.  The entity should be updated
+ * with an updated timestamp > updatedSince.
+ */
+public class EntityIndexOperation extends EntityIdScope {
+
+    private final long updatedSince;
+
+
+    public EntityIndexOperation( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
+        super( applicationScope, id );
+        this.updatedSince = updatedSince;
+    }
+
+
+    public long getUpdatedSince() {
+        return updatedSince;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
new file mode 100644
index 0000000..07160d8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface IndexServiceRequestBuilder {
+
+    /**
+     * Set the application id
+     */
+    IndexServiceRequestBuilder withApplicationId( final UUID applicationId );
+
+    /**
+     * Set the collection name.  If not set, every collection will be reindexed
+     * @param collectionName
+     * @return
+     */
+    IndexServiceRequestBuilder withCollection( final String collectionName );
+
+    /**
+     * Set our cursor to resume processing
+     * @param cursor
+     * @return
+     */
+    IndexServiceRequestBuilder withCursor(final String cursor);
+
+
+    /**
+     * Set the timestamp to re-index entities updated >= this timestamp
+     * @param timestamp
+     * @return
+     */
+    IndexServiceRequestBuilder withStartTimestamp(final Long timestamp);
+
+
+    /**
+     * Get the application scope
+     * @return
+     */
+    Optional<ApplicationScope> getApplicationScope();
+
+    /**
+     * Get the collection name
+     * @return
+     */
+    Optional<String> getCollectionName();
+
+    /**
+     * Get the cursor
+     * @return
+     */
+    Optional<String> getCursor();
+
+    /**
+     * Get the updated since timestamp
+     * @return
+     */
+    Optional<Long> getUpdateTimestamp();
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
new file mode 100644
index 0000000..3466674
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
+
+
+    /**
+     *
+     final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+
+     final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+     //create an observable that loads each entity and indexes it, start it running with publish
+     final ConnectableObservable<EdgeScope> runningReIndex =
+         allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+
+             //for each edge, create our scope and index on it
+             .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+
+
+
+     //start our sampler and state persistence
+     //take a sample every sample interval to allow us to resume state with minimal loss
+     runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
+         rxTaskScheduler.getAsyncIOScheduler() )
+         .doOnNext( edge -> {
+
+             final String serializedState = SerializableMapper.asString( edge );
+
+             mapManager.putString( newCursor, serializedState, INDEX_TTL );
+         } ).subscribe();
+
+
+     */
+
+    private Optional<UUID> withApplicationId;
+    private Optional<String> withCollectionName;
+    private Optional<String> cursor;
+    private Optional<Long> updateTimestamp;
+
+
+    /***
+     *
+     * @param applicationId
+     * @return
+     */
+    @Override
+    public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
+        this.withApplicationId = Optional.fromNullable(applicationId);
+        return this;
+    }
+
+
+    @Override
+    public IndexServiceRequestBuilder withCollection( final String collectionName ) {
+        this.withCollectionName = Optional.fromNullable( collectionName );
+        return this;
+    }
+
+
+    @Override
+    public IndexServiceRequestBuilder withCursor( final String cursor ) {
+        this.cursor = Optional.fromNullable( cursor );
+        return this;
+    }
+
+
+    @Override
+    public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
+        this.updateTimestamp = Optional.fromNullable(timestamp  );
+        return this;
+    }
+
+
+    @Override
+    public Optional<ApplicationScope> getApplicationScope() {
+
+        if(this.withApplicationId.isPresent()){
+            return Optional.of(  CpNamingUtils.getApplicationScope( withApplicationId.get()));
+        }
+
+        return Optional.absent();
+    }
+
+
+    @Override
+    public Optional<String> getCollectionName() {
+        return withCollectionName;
+    }
+
+
+    @Override
+    public Optional<String> getCursor() {
+        return cursor;
+    }
+
+
+    @Override
+    public Optional<Long> getUpdateTimestamp() {
+        return updateTimestamp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index b878246..672b3c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -33,5 +33,5 @@ public interface ReIndexAction {
      * Index this entity with the specified scope
      * @param entityIdScope
      */
-    void index( final EntityIdScope entityIdScope );
+    void index( final EntityIndexOperation entityIdScope );
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index e594ad3..b25eca5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -45,16 +45,17 @@ public interface ReIndexService {
     /**
      * Perform an index rebuild
      *
-     * @param appId The applicationId to re-index, or all applications if absent
-     * @param collection The collection name to re-index.  Otherwise all collections in an app will be used.
-     * @param cursor An optional cursor to resume processing
-     * @param startTimestamp The time to start indexing from.  All edges >= this time will be indexed.
+     * @param indexServiceRequestBuilder The builder to build the request
      * @return
      */
-    IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
-                        final Optional<Long> startTimestamp);
+    IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
 
 
+    /**
+     * Generate a build for the index
+     * @return
+     */
+    IndexServiceRequestBuilder getBuilder();
 
     /**
      * The response when requesting a re-index operation

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index bd1bff9..a2fa09a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,7 +20,6 @@
 package org.apache.usergrid.corepersistence.index;
 
 
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
@@ -28,7 +27,6 @@ import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
 import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
 import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.SerializableMapper;
 import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
 import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -46,13 +44,11 @@ import com.google.inject.Singleton;
 import rx.Observable;
 import rx.observables.ConnectableObservable;
 
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
 
 @Singleton
 public class ReIndexServiceImpl implements ReIndexService {
 
-    private static final MapScope RESUME_MAP_SCOPTE =
+    private static final MapScope RESUME_MAP_SCOPE =
         new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
 
     //Keep cursors to resume re-index for 1 day.  This is far beyond it's useful real world implications anyway.
@@ -78,31 +74,41 @@ public class ReIndexServiceImpl implements ReIndexService {
         this.rxTaskScheduler = rxTaskScheduler;
         this.indexService = indexService;
 
-        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE );
+        this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
     }
 
 
 
+
+
     @Override
-    public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
-                                       final Optional<Long> startTimestamp ) {
+    public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
 
-        //load our last emitted Scope if a cursor is present
-        if ( cursor.isPresent() ) {
+          //load our last emitted Scope if a cursor is present
+        if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
             throw new UnsupportedOperationException( "Build this" );
         }
 
 
-        final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+        final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
+        final Observable<ApplicationScope>  applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData();
+
+
+
 
         final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
 
+        final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
+
         //create an observable that loads each entity and indexes it, start it running with publish
         final ConnectableObservable<EdgeScope> runningReIndex =
-            allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+            allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+                indexServiceRequestBuilder.getCollectionName() )
 
                 //for each edge, create our scope and index on it
-                .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+                .doOnNext( edge -> indexService.index(
+                    new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+                        modifiedSince ) ) ).publish();
 
 
 
@@ -112,9 +118,9 @@ public class ReIndexServiceImpl implements ReIndexService {
             rxTaskScheduler.getAsyncIOScheduler() )
             .doOnNext( edge -> {
 
-                final String serializedState = SerializableMapper.asString( edge );
-
-                mapManager.putString( newCursor, serializedState, INDEX_TTL );
+//                final String serializedState = SerializableMapper.asString( edge );
+//
+//                mapManager.putString( newCursor, serializedState, INDEX_TTL );
             } ).subscribe();
 
 
@@ -124,6 +130,12 @@ public class ReIndexServiceImpl implements ReIndexService {
 
         return new IndexResponse( newCursor, runningReIndex );
     }
+
+
+    @Override
+    public IndexServiceRequestBuilder getBuilder() {
+        return new IndexServiceRequestBuilderImpl();
+    }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index b9e5373..aada240 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -44,9 +44,8 @@ public interface AllEntityIdsObservable {
      * Get all edges that represent edges to entities in the system
      * @param appScopes
      * @param edgeType The edge type to use (if specified)
-     * @param startTime The time to start with
      * @return
      */
-    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime);
+    Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 257fab1..6a95e7b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -81,12 +81,13 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
 
 
     @Override
-    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType,  final Optional<Long> startTime) {
+    public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) {
 
         return appScopes.flatMap( applicationScope -> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
 
-            return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(),edgeType,  startTime ).map( edge -> new EdgeScope(applicationScope, edge ));
+            return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType )
+                                  .map( edge -> new EdgeScope(applicationScope, edge ));
         } );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
deleted file mode 100644
index 19ecf6d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.util;
-
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-
-
-/**
- * A simple utility for serializing serializable classes to/and from strings.  To be used for small object storage only, such as resume on re-index
- * storing data such as entities should be specialized.
- */
-public class SerializableMapper {
-
-    private static final SmileFactory SMILE_FACTORY = new SmileFactory();
-
-    private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
-
-    static{
-        MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
-        SMILE_FACTORY.delegateToTextual( true );
-    }
-
-    /**
-     * Get value as a string
-     * @param toSerialize
-     * @param <T>
-     * @return
-     */
-    public static <T extends Serializable> String asString(final T toSerialize){
-        try {
-            return MAPPER.writeValueAsString( toSerialize );
-        }
-        catch ( JsonProcessingException e ) {
-            throw new RuntimeException( "Unable to process json", e );
-        }
-    }
-
-
-    /**
-     * Write the value as a string
-     * @param <T>
-     * @param serialized
-     * @param clazz
-     * @return
-     */
-    public static <T extends Serializable> T fromString(final String serialized, final Class<T> clazz){
-        Preconditions.checkNotNull(serialized, "serialized string cannot be null");
-
-
-        InputStream stream = new ByteArrayInputStream(serialized.getBytes( StandardCharsets.UTF_8));
-
-        try {
-            return MAPPER.readValue( stream, clazz );
-        }
-        catch ( IOException e ) {
-            throw new RuntimeException( String.format("Unable to parse string '%s'", serialized), e );
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 92f2b01..9e84219 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         final GraphManager gm = managerCache.getGraphManager( scope );
 
-        edgesFromSourceObservable.edgesFromSourceAscending( gm, applicationId ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 
@@ -118,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
 
         //test connections
 
-        edgesFromSourceObservable.edgesFromSourceAscending( gm, source ).doOnNext( edge -> {
+        edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
             final String edgeType = edge.getType();
             final Id target = edge.getTargetNode();
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 9f0bd60..964e13d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -38,7 +38,7 @@ public interface EdgesObservable {
      * @param sourceNode
      * @return
      */
-    Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode );
+    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
 
 
     /**
@@ -46,11 +46,10 @@ public interface EdgesObservable {
      * @param gm
      * @param sourceNode
      * @param edgeType The edge type if specified.  Otherwise all types will be used
-     * @param startTimestamp The start timestamp if specfiied, otherwise Long.MIN will be used
      * @return
      */
-    Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode,final Optional<String> edgeType,
-                                               final Optional<Long> startTimestamp );
+    Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+                                                final Optional<String> edgeType );
 
     /**
      * Get all edges from the source node with the target type

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index df9e094..7240798 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -55,7 +55,7 @@ public class EdgesObservableImpl implements EdgesObservable {
      * Get all edges from the source
      */
     @Override
-    public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ) {
+    public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
         final Observable<String> edgeTypes =
             gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
 
@@ -71,8 +71,8 @@ public class EdgesObservableImpl implements EdgesObservable {
 
 
     @Override
-    public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, final Optional<String> edgeTypeInput,
-                                                      final Optional<Long> startTimestamp ) {
+    public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+                                                       final Optional<String> edgeTypeInput ) {
 
 
 
@@ -85,7 +85,7 @@ public class EdgesObservableImpl implements EdgesObservable {
                 logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
 
                 return gm.loadEdgesFromSource(
-                    new SimpleSearchByEdgeType( sourceNode, edgeType, startTimestamp.or( Long.MIN_VALUE ), SearchByEdgeType.Order.ASCENDING,
+                    new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
                         Optional.<Edge>absent() ) );
         } );
     }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 5cf5117..82c7d54 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -55,7 +55,7 @@ public class TargetIdObservableImpl implements TargetIdObservable {
     public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
 
         //only search edge types that start with collections
-        return edgesFromSourceObservable.edgesFromSourceAscending( gm, sourceNode ).map( new Func1<Edge, Id>() {
+        return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( new Func1<Edge, Id>() {
 
 
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 0df26ff..d6c42e3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -87,7 +87,7 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> {
             final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
 
             //get edges from the source
-            return edgesFromSourceObservable.edgesFromSourceAscending( gm, graphNode.entryNode ).buffer( 1000 )
+            return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
                                             .doOnNext( edges -> {
                                                     final MutationBatch batch = keyspace.prepareMutationBatch();