You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/24 13:06:55 UTC
[70/83] [abbrv] usergrid git commit: Revert "Add ability to start
initial re-index seek with the UNIX timestamp. This will only start seeking
from the time provided,
rather than seeking all and discarding what doesn't match a filter."
Revert "Add ability to start initial re-index seek with the UNIX timestamp. This will only start seeking from the time provided, rather than seeking all and discarding what doesn't match a filter."
This reverts commit 85cc93436a163c3ba21a7ac1286c6bce3daebeb4.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/5db402d5
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/5db402d5
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/5db402d5
Branch: refs/heads/asf-site
Commit: 5db402d53e40b99901b2a97894cbdd77e60881b3
Parents: 85cc934
Author: Michael Russo <mr...@apigee.com>
Authored: Mon Oct 3 21:46:46 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Mon Oct 3 21:46:46 2016 -0700
----------------------------------------------------------------------
.../corepersistence/index/ReIndexServiceImpl.java | 18 ++++--------------
.../rx/impl/AllEntityIdsObservable.java | 6 +-----
.../rx/impl/AllEntityIdsObservableImpl.java | 7 ++-----
.../graph/serialization/EdgesObservable.java | 4 +---
.../serialization/impl/EdgesObservableImpl.java | 16 +++-------------
.../usergrid/rest/system/IndexResource.java | 12 ++++++------
6 files changed, 17 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index f37f9af..19fbcfa 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -135,17 +135,7 @@ public class ReIndexServiceImpl implements ReIndexService {
final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
- final long startTimestamp;
- if ( reIndexRequestBuilder.getUpdateTimestamp().isPresent() && reIndexRequestBuilder.getUpdateTimestamp().get() > 0 ){
-
- // edge timestamps are UUID timestamps, we need to convert from UNIX epoch to a UUID timestamp
- long uuidEpochNanos = 0x01b21dd213814000L; // num 100 nano seconds since uuid epoch
- startTimestamp = reIndexRequestBuilder.getUpdateTimestamp().get()*10000 + uuidEpochNanos;
- logger.info("Reindex provided with from timestamp, converted to an Edge timestamp is: {}", startTimestamp);
- }else{
- startTimestamp = 0;
- }
-
+ final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
// create an observable that loads a batch to be indexed
@@ -175,11 +165,11 @@ public class ReIndexServiceImpl implements ReIndexService {
}
allEntityIdsObservable.getEdgesToEntities( applicationScopes,
- reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue(), startTimestamp )
+ reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
.buffer( indexProcessorFig.getReindexBufferSize())
.doOnNext( edgeScopes -> {
logger.info("Sending batch of {} to be indexed.", edgeScopes.size());
- indexService.indexBatch(edgeScopes, startTimestamp);
+ indexService.indexBatch(edgeScopes, modifiedSince);
count.addAndGet(edgeScopes.size() );
if( edgeScopes.size() > 0 ) {
writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1));
@@ -188,7 +178,7 @@ public class ReIndexServiceImpl implements ReIndexService {
.doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() ))
.subscribeOn( Schedulers.io() ).subscribe();
-
+
return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index fe7a455..9070609 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -46,12 +46,8 @@ public interface AllEntityIdsObservable {
* @param appScopes
* @param edgeType The edge type to use (if specified)
* @param lastEdge The edge to resume processing from
- * @param startTimestamp An optional unix timestamp to start the seek ( it will be converted to an Edge )
* @return
*/
- Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes,
- final Optional<String> edgeType,
- Optional<Edge> lastEdge,
- final long startTimestamp );
+ Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge);
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index e6f3633..0420a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -82,15 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
@Override
- public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes,
- final Optional<String> edgeType,
- final Optional<Edge> lastEdge,
- final long startTimestamp ) {
+ public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge, startTimestamp )
+ return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
.map( edge -> new EdgeScope(applicationScope, edge ));
} );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 7c83207..78a1d4b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -64,10 +64,8 @@ public interface EdgesObservable {
* @param sourceNode
* @param edgeType The edge type if specified. Otherwise all types will be used
* @param resume The edge to start seeking after. Otherwise starts at the most recent
- * @param startTimestamp A unix timestamp to start seeking from if you don't have the edge cursor
* @return
*/
Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
- final Optional<String> edgeType, final Optional<Edge> resume,
- final long startTimestamp );
+ final Optional<String> edgeType, final Optional<Edge> resume );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 2504e87..20efe42 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.persistence.graph.serialization.impl;
-import org.apache.usergrid.persistence.graph.impl.SimpleEdge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,8 +74,8 @@ public class EdgesObservableImpl implements EdgesObservable {
@Override
public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
- final Optional<String> edgeTypeInput, final Optional<Edge> resume,
- final long startTimestamp ) {
+ final Optional<String> edgeTypeInput, final Optional<Edge> resume ) {
+
final Observable<String> edgeTypes = edgeTypeInput.isPresent()? Observable.just( edgeTypeInput.get() ):
@@ -85,22 +84,13 @@ public class EdgesObservableImpl implements EdgesObservable {
return edgeTypes.flatMap( edgeType -> {
- final Optional<Edge> start;
-
- if( !resume.isPresent() && startTimestamp > 0 ){
- // the target node doesn't matter here, the search only looks at the timestamp
- start = Optional.of(new SimpleEdge(sourceNode, edgeType, sourceNode, startTimestamp));
- }else{
- start = resume;
- }
-
if (logger.isTraceEnabled()) {
logger.trace("Loading edges of edgeType {} from {}", edgeType, sourceNode);
}
return gm.loadEdgesFromSource(
new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- start ) );
+ resume ) );
} );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/5db402d5/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
index 2be5b87..be60177 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java
@@ -61,7 +61,7 @@ import java.util.UUID;
public class IndexResource extends AbstractContextResource {
private static final Logger logger = LoggerFactory.getLogger( IndexResource.class );
- private static final String SINCE_FIELD = "since";
+ private static final String UPDATED_FIELD = "updated";
@@ -321,17 +321,17 @@ public class IndexResource extends AbstractContextResource {
final String callback ) {
Map<String,Object> newPayload = payload;
- if(newPayload == null || !payload.containsKey(SINCE_FIELD)){
+ if(newPayload == null || !payload.containsKey( UPDATED_FIELD )){
newPayload = new HashMap<>(1);
- newPayload.put(SINCE_FIELD,0);
+ newPayload.put(UPDATED_FIELD,0);
}
- Preconditions.checkArgument(newPayload.get(SINCE_FIELD) instanceof Number,
+ Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number,
"You must specified the field \"updated\" in the payload and it must be a timestamp" );
//add our updated timestamp to the request
- if ( newPayload.containsKey(SINCE_FIELD) ) {
- final long timestamp = ConversionUtils.getLong(newPayload.get(SINCE_FIELD));
+ if ( newPayload.containsKey( UPDATED_FIELD ) ) {
+ final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD));
request.withStartTimestamp( timestamp );
}