You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/05/13 02:40:26 UTC
[4/4] incubator-usergrid git commit: Refactored observable methods to
correct name
Refactored observable methods to correct name
Added timestamp to message so that consumers can filter values for faster processing
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cb179d35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cb179d35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cb179d35
Branch: refs/heads/USERGRID-643
Commit: cb179d3512952203d20c00773789df23e27f147d
Parents: b1d9ac2
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 12 17:40:20 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 17:40:20 2015 -0700
----------------------------------------------------------------------
.../asyncevents/EventBuilder.java | 6 +-
.../asyncevents/EventBuilderImpl.java | 18 ++-
.../asyncevents/InMemoryAsyncEventService.java | 5 +-
.../asyncevents/SQSAsyncEventService.java | 9 +-
.../index/EntityIndexOperation.java | 46 +++++++
.../index/IndexServiceRequestBuilder.java | 88 +++++++++++++
.../index/IndexServiceRequestBuilderImpl.java | 130 +++++++++++++++++++
.../corepersistence/index/ReIndexAction.java | 2 +-
.../corepersistence/index/ReIndexService.java | 13 +-
.../index/ReIndexServiceImpl.java | 44 ++++---
.../rx/impl/AllEntityIdsObservable.java | 3 +-
.../rx/impl/AllEntityIdsObservableImpl.java | 5 +-
.../util/SerializableMapper.java | 91 -------------
.../rx/EdgesToTargetObservableIT.java | 4 +-
.../graph/serialization/EdgesObservable.java | 7 +-
.../serialization/impl/EdgesObservableImpl.java | 8 +-
.../impl/TargetIdObservableImpl.java | 2 +-
.../impl/migration/EdgeDataMigrationImpl.java | 2 +-
18 files changed, 342 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f48451c..f9f157e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -22,8 +22,8 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.util.List;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
@@ -72,10 +72,10 @@ public interface EventBuilder {
/**
* Re-index an entity in the scope provided
- * @param entityIdScope
+ * @param entityIndexOperation
* @return
*/
- Observable<IndexOperationMessage> index( EntityIdScope entityIdScope );
+ Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
/**
* A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index c0d82d2..d35ed6d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -25,12 +25,13 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
@@ -38,6 +39,7 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -140,14 +142,20 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) {
+ public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
- final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+ final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
- final Id entityId = entityIdScope.getId();
+ final Id entityId = entityIndexOperation.getId();
//load the entity
- return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
+ return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter(
+ entity -> {
+ final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+
+ //only re-index if it has been updated and been updated after our timestamp
+ return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
+ } )
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6faa695..96966bf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -97,10 +98,10 @@ public class InMemoryAsyncEventService implements AsyncEventService {
@Override
- public void index( final EntityIdScope entityIdScope ) {
+ public void index( final EntityIndexOperation entityIndexOperation ) {
- run(eventBuilder.index( entityIdScope ));
+ run(eventBuilder.index( entityIndexOperation ));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
index 415e5e8..1dbfd4e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.exception.NotImplementedException;
@@ -184,7 +185,7 @@ public class SQSAsyncEventService implements AsyncEventService {
}
- @Override
+// @Override
public void index( final EntityIdScope entityIdScope ) {
//queue the re-inex operation
offer( entityIdScope );
@@ -346,4 +347,10 @@ public class SQSAsyncEventService implements AsyncEventService {
subscriptions.add( subscription );
}
}
+
+
+ @Override
+ public void index( final EntityIndexOperation entityIdScope ) {
+ throw new NotImplementedException( "Implement me" );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
new file mode 100644
index 0000000..3548bbe
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The operation for re-indexing an entity. The entity should be updated
+ * with an updated timestamp > updatedSince.
+ */
+public class EntityIndexOperation extends EntityIdScope {
+
+ private final long updatedSince;
+
+
+ public EntityIndexOperation( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
+ super( applicationScope, id );
+ this.updatedSince = updatedSince;
+ }
+
+
+ public long getUpdatedSince() {
+ return updatedSince;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
new file mode 100644
index 0000000..07160d8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface IndexServiceRequestBuilder {
+
+ /**
+ * Set the application id
+ */
+ IndexServiceRequestBuilder withApplicationId( final UUID applicationId );
+
+ /**
+ * Set the collection name. If not set, every collection will be reindexed
+ * @param collectionName
+ * @return
+ */
+ IndexServiceRequestBuilder withCollection( final String collectionName );
+
+ /**
+ * Set our cursor to resume processing
+ * @param cursor
+ * @return
+ */
+ IndexServiceRequestBuilder withCursor(final String cursor);
+
+
+ /**
+ * Set the timestamp to re-index entities updated >= this timestamp
+ * @param timestamp
+ * @return
+ */
+ IndexServiceRequestBuilder withStartTimestamp(final Long timestamp);
+
+
+ /**
+ * Get the application scope
+ * @return
+ */
+ Optional<ApplicationScope> getApplicationScope();
+
+ /**
+ * Get the collection name
+ * @return
+ */
+ Optional<String> getCollectionName();
+
+ /**
+ * Get the cursor
+ * @return
+ */
+ Optional<String> getCursor();
+
+ /**
+ * Get the updated since timestamp
+ * @return
+ */
+ Optional<Long> getUpdateTimestamp();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
new file mode 100644
index 0000000..3466674
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
+
+
+ /**
+ *
+ final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+
+ final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+ //create an observable that loads each entity and indexes it, start it running with publish
+ final ConnectableObservable<EdgeScope> runningReIndex =
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+
+ //for each edge, create our scope and index on it
+ .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+
+
+
+ //start our sampler and state persistence
+ //take a sample every sample interval to allow us to resume state with minimal loss
+ runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
+ rxTaskScheduler.getAsyncIOScheduler() )
+ .doOnNext( edge -> {
+
+ final String serializedState = SerializableMapper.asString( edge );
+
+ mapManager.putString( newCursor, serializedState, INDEX_TTL );
+ } ).subscribe();
+
+
+ */
+
+ private Optional<UUID> withApplicationId;
+ private Optional<String> withCollectionName;
+ private Optional<String> cursor;
+ private Optional<Long> updateTimestamp;
+
+
+ /***
+ *
+ * @param applicationId
+ * @return
+ */
+ @Override
+ public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
+ this.withApplicationId = Optional.fromNullable(applicationId);
+ return this;
+ }
+
+
+ @Override
+ public IndexServiceRequestBuilder withCollection( final String collectionName ) {
+ this.withCollectionName = Optional.fromNullable( collectionName );
+ return this;
+ }
+
+
+ @Override
+ public IndexServiceRequestBuilder withCursor( final String cursor ) {
+ this.cursor = Optional.fromNullable( cursor );
+ return this;
+ }
+
+
+ @Override
+ public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
+ this.updateTimestamp = Optional.fromNullable(timestamp );
+ return this;
+ }
+
+
+ @Override
+ public Optional<ApplicationScope> getApplicationScope() {
+
+ if(this.withApplicationId.isPresent()){
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get()));
+ }
+
+ return Optional.absent();
+ }
+
+
+ @Override
+ public Optional<String> getCollectionName() {
+ return withCollectionName;
+ }
+
+
+ @Override
+ public Optional<String> getCursor() {
+ return cursor;
+ }
+
+
+ @Override
+ public Optional<Long> getUpdateTimestamp() {
+ return updateTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index b878246..672b3c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -33,5 +33,5 @@ public interface ReIndexAction {
* Index this entity with the specified scope
* @param entityIdScope
*/
- void index( final EntityIdScope entityIdScope );
+ void index( final EntityIndexOperation entityIdScope );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index e594ad3..b25eca5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -45,16 +45,17 @@ public interface ReIndexService {
/**
* Perform an index rebuild
*
- * @param appId The applicationId to re-index, or all applications if absent
- * @param collection The collection name to re-index. Otherwise all collections in an app will be used.
- * @param cursor An optional cursor to resume processing
- * @param startTimestamp The time to start indexing from. All edges >= this time will be indexed.
+ * @param indexServiceRequestBuilder The builder to build the request
* @return
*/
- IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
- final Optional<Long> startTimestamp);
+ IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
+ /**
+ * Generate a build for the index
+ * @return
+ */
+ IndexServiceRequestBuilder getBuilder();
/**
* The response when requesting a re-index operation
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index bd1bff9..a2fa09a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.corepersistence.index;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
@@ -28,7 +27,6 @@ import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.SerializableMapper;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -46,13 +44,11 @@ import com.google.inject.Singleton;
import rx.Observable;
import rx.observables.ConnectableObservable;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
@Singleton
public class ReIndexServiceImpl implements ReIndexService {
- private static final MapScope RESUME_MAP_SCOPTE =
+ private static final MapScope RESUME_MAP_SCOPE =
new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
//Keep cursors to resume re-index for 1 day. This is far beyond it's useful real world implications anyway.
@@ -78,31 +74,41 @@ public class ReIndexServiceImpl implements ReIndexService {
this.rxTaskScheduler = rxTaskScheduler;
this.indexService = indexService;
- this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE );
+ this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
}
+
+
@Override
- public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
- final Optional<Long> startTimestamp ) {
+ public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
- //load our last emitted Scope if a cursor is present
- if ( cursor.isPresent() ) {
+ //load our last emitted Scope if a cursor is present
+ if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
throw new UnsupportedOperationException( "Build this" );
}
- final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+ final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
+ final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData();
+
+
+
final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+ final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
+
//create an observable that loads each entity and indexes it, start it running with publish
final ConnectableObservable<EdgeScope> runningReIndex =
- allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ indexServiceRequestBuilder.getCollectionName() )
//for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+ .doOnNext( edge -> indexService.index(
+ new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+ modifiedSince ) ) ).publish();
@@ -112,9 +118,9 @@ public class ReIndexServiceImpl implements ReIndexService {
rxTaskScheduler.getAsyncIOScheduler() )
.doOnNext( edge -> {
- final String serializedState = SerializableMapper.asString( edge );
-
- mapManager.putString( newCursor, serializedState, INDEX_TTL );
+// final String serializedState = SerializableMapper.asString( edge );
+//
+// mapManager.putString( newCursor, serializedState, INDEX_TTL );
} ).subscribe();
@@ -124,6 +130,12 @@ public class ReIndexServiceImpl implements ReIndexService {
return new IndexResponse( newCursor, runningReIndex );
}
+
+
+ @Override
+ public IndexServiceRequestBuilder getBuilder() {
+ return new IndexServiceRequestBuilderImpl();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index b9e5373..aada240 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -44,9 +44,8 @@ public interface AllEntityIdsObservable {
* Get all edges that represent edges to entities in the system
* @param appScopes
* @param edgeType The edge type to use (if specified)
- * @param startTime The time to start with
* @return
*/
- Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime);
+ Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 257fab1..6a95e7b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -81,12 +81,13 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
@Override
- public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime) {
+ public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(),edgeType, startTime ).map( edge -> new EdgeScope(applicationScope, edge ));
+ return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType )
+ .map( edge -> new EdgeScope(applicationScope, edge ));
} );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
deleted file mode 100644
index 19ecf6d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.util;
-
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-
-
-/**
- * A simple utility for serializing serializable classes to/and from strings. To be used for small object storage only, such as resume on re-index
- * storing data such as entities should be specialized.
- */
-public class SerializableMapper {
-
- private static final SmileFactory SMILE_FACTORY = new SmileFactory();
-
- private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
-
- static{
- MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
- SMILE_FACTORY.delegateToTextual( true );
- }
-
- /**
- * Get value as a string
- * @param toSerialize
- * @param <T>
- * @return
- */
- public static <T extends Serializable> String asString(final T toSerialize){
- try {
- return MAPPER.writeValueAsString( toSerialize );
- }
- catch ( JsonProcessingException e ) {
- throw new RuntimeException( "Unable to process json", e );
- }
- }
-
-
- /**
- * Write the value as a string
- * @param <T>
- * @param serialized
- * @param clazz
- * @return
- */
- public static <T extends Serializable> T fromString(final String serialized, final Class<T> clazz){
- Preconditions.checkNotNull(serialized, "serialized string cannot be null");
-
-
- InputStream stream = new ByteArrayInputStream(serialized.getBytes( StandardCharsets.UTF_8));
-
- try {
- return MAPPER.readValue( stream, clazz );
- }
- catch ( IOException e ) {
- throw new RuntimeException( String.format("Unable to parse string '%s'", serialized), e );
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 92f2b01..9e84219 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- edgesFromSourceObservable.edgesFromSourceAscending( gm, applicationId ).doOnNext( edge -> {
+ edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
final String edgeType = edge.getType();
final Id target = edge.getTargetNode();
@@ -118,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
//test connections
- edgesFromSourceObservable.edgesFromSourceAscending( gm, source ).doOnNext( edge -> {
+ edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
final String edgeType = edge.getType();
final Id target = edge.getTargetNode();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 9f0bd60..964e13d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -38,7 +38,7 @@ public interface EdgesObservable {
* @param sourceNode
* @return
*/
- Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode );
+ Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
/**
@@ -46,11 +46,10 @@ public interface EdgesObservable {
* @param gm
* @param sourceNode
* @param edgeType The edge type if specified. Otherwise all types will be used
- * @param startTimestamp The start timestamp if specfiied, otherwise Long.MIN will be used
* @return
*/
- Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode,final Optional<String> edgeType,
- final Optional<Long> startTimestamp );
+ Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+ final Optional<String> edgeType );
/**
* Get all edges from the source node with the target type
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index df9e094..7240798 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -55,7 +55,7 @@ public class EdgesObservableImpl implements EdgesObservable {
* Get all edges from the source
*/
@Override
- public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ) {
+ public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
final Observable<String> edgeTypes =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
@@ -71,8 +71,8 @@ public class EdgesObservableImpl implements EdgesObservable {
@Override
- public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, final Optional<String> edgeTypeInput,
- final Optional<Long> startTimestamp ) {
+ public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+ final Optional<String> edgeTypeInput ) {
@@ -85,7 +85,7 @@ public class EdgesObservableImpl implements EdgesObservable {
logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
return gm.loadEdgesFromSource(
- new SimpleSearchByEdgeType( sourceNode, edgeType, startTimestamp.or( Long.MIN_VALUE ), SearchByEdgeType.Order.ASCENDING,
+ new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.<Edge>absent() ) );
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 5cf5117..82c7d54 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -55,7 +55,7 @@ public class TargetIdObservableImpl implements TargetIdObservable {
public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
//only search edge types that start with collections
- return edgesFromSourceObservable.edgesFromSourceAscending( gm, sourceNode ).map( new Func1<Edge, Id>() {
+ return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( new Func1<Edge, Id>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 0df26ff..d6c42e3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -87,7 +87,7 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> {
final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
//get edges from the source
- return edgesFromSourceObservable.edgesFromSourceAscending( gm, graphNode.entryNode ).buffer( 1000 )
+ return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
.doOnNext( edges -> {
final MutationBatch batch = keyspace.prepareMutationBatch();