You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/22 02:22:03 UTC
[05/19] incubator-usergrid git commit: Finishes changes before tests
Finishes changes before tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/20c9b350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/20c9b350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/20c9b350
Branch: refs/heads/USERGRID-641
Commit: 20c9b3509cf96a6ecab1a45a2c572fd6a041e00d
Parents: cb179d3
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 17:19:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 17:19:27 2015 -0600
----------------------------------------------------------------------
.../asyncevents/InMemoryAsyncEventService.java | 2 -
.../index/EdgeScopeSerializer.java | 41 ++++
.../index/IndexProcessorFig.java | 6 +-
.../corepersistence/index/ReIndexService.java | 75 +++---
.../index/ReIndexServiceImpl.java | 226 +++++++++++++++----
.../pipeline/cursor/CursorSerializerUtil.java | 54 ++++-
.../pipeline/cursor/RequestCursor.java | 9 +-
.../pipeline/cursor/ResponseCursor.java | 49 ++--
.../pipeline/read/AbstractPathFilter.java | 30 ---
.../pipeline/read/CursorSeek.java | 53 +++++
.../rx/impl/AllEntityIdsObservable.java | 4 +-
.../rx/impl/AllEntityIdsObservableImpl.java | 5 +-
.../PerformanceEntityRebuildIndexTest.java | 5 +-
.../graph/serialization/EdgesObservable.java | 21 +-
.../serialization/impl/EdgesObservableImpl.java | 4 +-
15 files changed, 422 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 96966bf..ddcf826 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -99,8 +99,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
@Override
public void index( final EntityIndexOperation entityIndexOperation ) {
-
-
run(eventBuilder.index( entityIndexOperation ));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
new file mode 100644
index 0000000..2a6a5ac
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+
+
+/**
+ * Serialize our edge scope for cursors
+ */
+public class EdgeScopeSerializer extends AbstractCursorSerializer<EdgeScope> {
+
+
+ public static final EdgeScopeSerializer INSTANCE = new EdgeScopeSerializer();
+
+ @Override
+ protected Class<EdgeScope> getType() {
+ return EdgeScope.class;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index fe9d3fd..8e835e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,9 +78,9 @@ public interface IndexProcessorFig extends GuicyFig {
String getQueueImplementation();
- @Default("30000")
- @Key("elasticsearch.reindex.sample.interval")
- long getReIndexSampleInterval();
+ @Default("10000")
+ @Key("elasticsearch.reindex.flush.interval")
+ int getUpdateInterval();
@Default("false")
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b25eca5..f8955dd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,22 +20,6 @@
package org.apache.usergrid.corepersistence.index;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-import rx.Observer;
-import rx.observables.ConnectableObservable;
-
-
/**
* An interface for re-indexing all entities in an application
*/
@@ -46,48 +30,75 @@ public interface ReIndexService {
* Perform an index rebuild
*
* @param indexServiceRequestBuilder The builder to build the request
- * @return
*/
- IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
+ IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
/**
* Generate a build for the index
- * @return
*/
IndexServiceRequestBuilder getBuilder();
+
+ /**
+ * Get the status of a job
+ * @param jobId The jobId returned during the rebuild index
+ * @return
+ */
+ IndexResponse getStatus( final String jobId );
+
+
/**
* The response when requesting a re-index operation
*/
class IndexResponse {
- final String cursor;
- final ConnectableObservable<EdgeScope> indexedEdgecount;
+ final String jobId;
+ final String status;
+ final long numberProcessed;
+ final long lastUpdated;
+
+
+ public IndexResponse( final String jobId, final String status, final long numberProcessed,
+ final long lastUpdated ) {
+ this.jobId = jobId;
+ this.status = status;
+ this.numberProcessed = numberProcessed;
+ this.lastUpdated = lastUpdated;
+ }
- public IndexResponse( final String cursor, final ConnectableObservable<EdgeScope> indexedEdgecount ) {
- this.cursor = cursor;
- this.indexedEdgecount = indexedEdgecount;
+ /**
+ * Get the jobId used to resume this operation
+ */
+ public String getJobId() {
+ return jobId;
+ }
+
+
+ /**
+ * Get the last updated time, as a long
+ * @return
+ */
+ public long getLastUpdated() {
+ return lastUpdated;
}
/**
- * Get the cursor used to resume this operation
+ * Get the number of records processed
* @return
*/
- public String getCursor() {
- return cursor;
+ public long getNumberProcessed() {
+ return numberProcessed;
}
/**
- * Return the observable of all edges to be indexed.
- *
- * Note that after subscribing "connect" will need to be called to ensure that processing begins
+ * Get the status
* @return
*/
- public ConnectableObservable<EdgeScope> getCount() {
- return indexedEdgecount;
+ public String getStatus() {
+ return status;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index a2fa09a..d828fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,29 +20,32 @@
package org.apache.usergrid.corepersistence.index;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import rx.Observable;
-import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
@Singleton
@@ -51,14 +54,18 @@ public class ReIndexServiceImpl implements ReIndexService {
private static final MapScope RESUME_MAP_SCOPE =
new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
- //Keep cursors to resume re-index for 1 day. This is far beyond it's useful real world implications anyway.
+ //Keep cursors to resume re-index for 10 days. This is far beyond it's useful real world implications anyway.
private static final int INDEX_TTL = 60 * 60 * 24 * 10;
+ private static final String MAP_CURSOR_KEY = "cursor";
+ private static final String MAP_COUNT_KEY = "count";
+ private static final String MAP_STATUS_KEY = "status";
+ private static final String MAP_UPDATED_KEY = "lastUpdated";
+
private final AllApplicationsObservable allApplicationsObservable;
private final AllEntityIdsObservable allEntityIdsObservable;
private final IndexProcessorFig indexProcessorFig;
- private final RxTaskScheduler rxTaskScheduler;
private final MapManager mapManager;
private final AsyncEventService indexService;
@@ -66,69 +73,61 @@ public class ReIndexServiceImpl implements ReIndexService {
@Inject
public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable,
final MapManagerFactory mapManagerFactory,
- final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig,
- final RxTaskScheduler rxTaskScheduler, final AsyncEventService indexService ) {
+ final AllApplicationsObservable allApplicationsObservable,
+ final IndexProcessorFig indexProcessorFig, final AsyncEventService indexService ) {
this.allEntityIdsObservable = allEntityIdsObservable;
this.allApplicationsObservable = allApplicationsObservable;
this.indexProcessorFig = indexProcessorFig;
- this.rxTaskScheduler = rxTaskScheduler;
this.indexService = indexService;
this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
}
-
-
-
@Override
public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
- //load our last emitted Scope if a cursor is present
- if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
- throw new UnsupportedOperationException( "Build this" );
- }
+ //load our last emitted Scope if a cursor is present
+ final Optional<EdgeScope> cursor = parseCursor( indexServiceRequestBuilder.getCursor() );
+
+
+ final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
- final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData();
+ Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+ "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" );
+
+ final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId );
- final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+ final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
//create an observable that loads each entity and indexes it, start it running with publish
- final ConnectableObservable<EdgeScope> runningReIndex =
- allEntityIdsObservable.getEdgesToEntities( applicationScopes,
- indexServiceRequestBuilder.getCollectionName() )
-
- //for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index(
- new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
- modifiedSince ) ) ).publish();
+ final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+ //for each edge, create our scope and index on it
+ .doOnNext( edge -> indexService.index(
+ new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+ modifiedSince ) ) );
//start our sampler and state persistence
//take a sample every sample interval to allow us to resume state with minimal loss
- runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
- rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( edge -> {
-
-// final String serializedState = SerializableMapper.asString( edge );
-//
-// mapManager.putString( newCursor, serializedState, INDEX_TTL );
- } ).subscribe();
+ runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
+ //create our flushing collector and flush the edge scopes to it
+ .collect( () -> new FlushingCollector( jobId ),
+ ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() )
+ //subscribe on our I/O scheduler and run the task
+ .subscribeOn( Schedulers.io() ).subscribe();
- //start pushing to both
- runningReIndex.connect();
-
-
- return new IndexResponse( newCursor, runningReIndex );
+ return new IndexResponse( jobId, "Started", 0, 0 );
}
@@ -136,6 +135,155 @@ public class ReIndexServiceImpl implements ReIndexService {
public IndexServiceRequestBuilder getBuilder() {
return new IndexServiceRequestBuilderImpl();
}
+
+
+ @Override
+ public IndexResponse getStatus( final String jobId ) {
+ Preconditions.checkNotNull( jobId, "jobId must not be null" );
+ return getIndexResponse( jobId );
+ }
+
+
+ /**
+ * Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete
+ */
+ private class FlushingCollector {
+
+ private final String jobId;
+ private long count;
+
+
+ private FlushingCollector( final String jobId ) {
+ this.jobId = jobId;
+ }
+
+
+ public void flushBuffer( final List<EdgeScope> buffer ) {
+ count += buffer.size();
+
+ //write our cursor state
+ if ( buffer.size() > 0 ) {
+ writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
+ }
+
+ writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+ }
+
+ public void complete(){
+ writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+ }
+ }
+
+
+ /**
+ * Get the resume edge scope
+ *
+ * @param edgeScope The optional edge scope from the cursor
+ */
+ private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) {
+
+
+ if ( edgeScope.isPresent() ) {
+ return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+ }
+
+ return new CursorSeek<>( Optional.absent() );
+ }
+
+
+ /**
+ * Generate an observable for our appliation scope
+ */
+ private Observable<ApplicationScope> getApplications( final Optional<EdgeScope> cursor,
+ final Optional<ApplicationScope> appId ) {
+ //cursor is present use it and skip until we hit that app
+ if ( cursor.isPresent() ) {
+
+ final EdgeScope cursorValue = cursor.get();
+ //we have a cursor and an application scope that was used.
+ return allApplicationsObservable.getData().skipWhile(
+ applicationScope -> !cursorValue.getApplicationScope().equals( applicationScope ) );
+ }
+ //this is intentional. If
+ else if ( appId.isPresent() ) {
+ return Observable.just( appId.get() );
+ }
+
+ return allApplicationsObservable.getData();
+ }
+
+
+ /**
+ * Swap our cursor for an optional edgescope
+ */
+ private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) {
+
+ if ( !cursor.isPresent() ) {
+ return Optional.absent();
+ }
+
+ //get our cursor
+ final String persistedCursor = mapManager.getString( cursor.get() );
+
+ if ( persistedCursor == null ) {
+ return Optional.absent();
+ }
+
+ final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+ final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() );
+
+ return Optional.of( edgeScope );
+ }
+
+
+ /**
+ * Write the cursor state to the map in cassandra
+ */
+ private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+ final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge );
+
+ final String serializedState = CursorSerializerUtil.asString( node );
+
+ mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, INDEX_TTL );
+ }
+
+
+ /**
+ * Write our state meta data into cassandra so everyone can see it
+ * @param jobId
+ * @param status
+ * @param processedCount
+ * @param lastUpdated
+ */
+ private void writeStateMeta( final String jobId, final String status, final long processedCount,
+ final long lastUpdated ) {
+
+ mapManager.putString( jobId + MAP_STATUS_KEY, status );
+ mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+ mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+ }
+
+
+ /**
+ * Get the index response from the jobId
+ * @param jobId
+ * @return
+ */
+ private IndexResponse getIndexResponse( final String jobId ) {
+
+ final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+ if(status == null){
+ throw new IllegalArgumentException( "Could not find a job with id " + jobId );
+ }
+
+ final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+
+ return new IndexResponse( jobId, status, processedCount, lastUpdated );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
index fea0364..7acdd00 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
@@ -20,10 +20,15 @@
package org.apache.usergrid.corepersistence.pipeline.cursor;
-import com.fasterxml.jackson.core.Base64Variant;
-import com.fasterxml.jackson.core.Base64Variants;
+import java.io.IOException;
+import java.util.Base64;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Preconditions;
/**
@@ -35,9 +40,54 @@ public class CursorSerializerUtil {
private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
+ /**
+ * Aritrary number, just meant to keep us from having a DOS issue
+ */
+ private static final int MAX_SIZE = 1024;
+
public static ObjectMapper getMapper() {
return MAPPER;
}
+
+ /**
+ * Turn the json node in to a base64 encoded SMILE binary
+ */
+ public static String asString( final JsonNode node ) {
+ final byte[] output;
+ try {
+ output = MAPPER.writeValueAsBytes( node );
+ }
+ catch ( JsonProcessingException e ) {
+ throw new RuntimeException( "Unable to create output from json node " + node );
+ }
+
+ //generate a base64 url save string
+ final String value = Base64.getUrlEncoder().encodeToString( output );
+
+ return value;
+ }
+
+
+ /**
+ * Parse the base64 encoded binary string
+ */
+ public static JsonNode fromString( final String base64EncodedJson ) {
+
+ Preconditions.checkArgument( base64EncodedJson.length() <= MAX_SIZE,
+ "Your cursor must be less than " + MAX_SIZE + " chars in length" );
+
+ final byte[] data = Base64.getUrlDecoder().decode( base64EncodedJson );
+
+ JsonNode jsonNode;
+ try {
+ jsonNode = MAPPER.readTree( data );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to parse json node from string " + base64EncodedJson );
+ }
+
+ return jsonNode;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
index 870edbb..dc6ae71 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
@@ -37,10 +37,6 @@ import com.google.common.base.Preconditions;
*/
public class RequestCursor {
- /**
- * Aritrary number, just meant to keep us from having a DOS issue
- */
- private static final int MAX_SIZE = 1024;
private static final int MAX_CURSOR_COUNT = 100;
@@ -83,11 +79,8 @@ public class RequestCursor {
try {
- Preconditions.checkArgument( cursor.length() <= MAX_SIZE, "Your cursor must be less than " + MAX_SIZE + " chars in length");
-
- final byte[] data = Base64.getUrlDecoder().decode( cursor );
- JsonNode jsonNode = MAPPER.readTree( data );
+ JsonNode jsonNode = CursorSerializerUtil.fromString( cursor );
Preconditions
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index dbd8b88..dc4bf39 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,11 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.cursor;
-import java.util.Base64;
-
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -53,8 +50,8 @@ public class ResponseCursor {
/**
- * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to take the
- * time to calculate it
+ * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to
+ * take the time to calculate it
*/
public Optional<String> encodeAsString() {
@@ -68,42 +65,34 @@ public class ResponseCursor {
return encodedValue;
}
+ //no edge path, short circuit
- try {
-
- //no edge path, short circuit
-
- final ObjectNode map = MAPPER.createObjectNode();
+ final ObjectNode map = MAPPER.createObjectNode();
- Optional<EdgePath> current = edgePath;
+ Optional<EdgePath> current = edgePath;
- //traverse each edge and add them to our json
- do {
+ //traverse each edge and add them to our json
+ do {
- final EdgePath edgePath = current.get();
- final Object cursorValue = edgePath.getCursorValue();
- final CursorSerializer serializer = edgePath.getSerializer();
- final int filterId = edgePath.getFilterId();
+ final EdgePath edgePath = current.get();
+ final Object cursorValue = edgePath.getCursorValue();
+ final CursorSerializer serializer = edgePath.getSerializer();
+ final int filterId = edgePath.getFilterId();
- final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
- map.put( String.valueOf( filterId ), serialized );
+ final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+ map.put( String.valueOf( filterId ), serialized );
- current = current.get().getPrevious();
- }
- while ( current.isPresent() );
+ current = current.get().getPrevious();
+ }
+ while ( current.isPresent() );
- final byte[] output = MAPPER.writeValueAsBytes( map );
+ //generate a base64 url save string
+ final String value = CursorSerializerUtil.asString( map );
- //generate a base64 url save string
- final String value = Base64.getUrlEncoder().encodeToString( output );
+ encodedValue = Optional.of( value );
- encodedValue = Optional.of( value );
- }
- catch ( JsonProcessingException e ) {
- throw new CursorParseException( "Unable to serialize cursor", e );
- }
return encodedValue;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
index c68dc4a..0f9ac9b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -76,34 +76,4 @@ public abstract class AbstractPathFilter<T, R, C extends Serializable> extends A
* Return the class to be used when parsing the cursor
*/
protected abstract CursorSerializer<C> getCursorSerializer();
-
-
- /**
- * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
- * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
- */
- protected static class CursorSeek<C> {
-
- private Optional<C> seek;
-
- private CursorSeek(final Optional<C> cursorValue){
- seek = cursorValue;
- }
-
-
- /**
- * Get the seek value to use when searching
- * @return
- */
- public Optional<C> getSeekValue(){
- final Optional<C> toReturn = seek;
-
- seek = Optional.absent();
-
- return toReturn;
- }
-
-
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
new file mode 100644
index 0000000..b803658
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
+ * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
+ */
+public class CursorSeek<C> {
+
+ private Optional<C> seek;
+
+ public CursorSeek( final Optional<C> cursorValue ){
+ seek = cursorValue;
+ }
+
+
+ /**
+ * Get the seek value to use when searching
+ * @return
+ */
+ public Optional<C> getSeekValue(){
+ final Optional<C> toReturn = seek;
+
+ seek = Optional.absent();
+
+ return toReturn;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index aada240..9070609 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -24,6 +24,7 @@ import com.google.common.base.Optional;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
import rx.Observable;
@@ -44,8 +45,9 @@ public interface AllEntityIdsObservable {
* Get all edges that represent edges to entities in the system
* @param appScopes
* @param edgeType The edge type to use (if specified)
+ * @param lastEdge The edge to resume processing from
* @return
*/
- Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType);
+ Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 6a95e7b..0420a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -28,6 +28,7 @@ import com.google.inject.Singleton;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
@@ -81,12 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
@Override
- public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) {
+ public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType )
+ return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
.map( edge -> new EdgeScope(applicationScope, edge ));
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index a17c925..cb9919f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Optional;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.junit.After;
@@ -196,7 +197,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
try {
- reIndexService.rebuildIndex( Optional.of( em.getApplicationId()), Optional.<String>of("catherders"), Optional.absent(), Optional.absent() );
+ final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
+
+ reIndexService.rebuildIndex(builder );
reporter.report();
registry.remove( meterName );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 964e13d..78a1d4b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -42,16 +42,6 @@ public interface EdgesObservable {
/**
- * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified
- * @param gm
- * @param sourceNode
- * @param edgeType The edge type if specified. Otherwise all types will be used
- * @return
- */
- Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
- final Optional<String> edgeType );
-
- /**
* Get all edges from the source node with the target type
* @param gm
* @param sourceNode
@@ -67,4 +57,15 @@ public interface EdgesObservable {
* @return
*/
Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode);
+
+ /**
+ * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified
+ * @param gm
+ * @param sourceNode
+ * @param edgeType The edge type if specified. Otherwise all types will be used
+ * @param resume The edge to start seeking after. Otherwise starts at the most recent
+ * @return
+ */
+ Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+ final Optional<String> edgeType, final Optional<Edge> resume );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 7240798..18274ac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -72,7 +72,7 @@ public class EdgesObservableImpl implements EdgesObservable {
@Override
public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
- final Optional<String> edgeTypeInput ) {
+ final Optional<String> edgeTypeInput, final Optional<Edge> resume ) {
@@ -86,7 +86,7 @@ public class EdgesObservableImpl implements EdgesObservable {
return gm.loadEdgesFromSource(
new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- Optional.<Edge>absent() ) );
+ resume ) );
} );
}