You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by gr...@apache.org on 2015/05/21 01:00:31 UTC
[01/10] incubator-usergrid git commit: Update of Apis and tests
Repository: incubator-usergrid
Updated Branches:
refs/heads/two-dot-o-dev b637535dd -> b28aeee05
Update of Apis and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/dbfff997
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/dbfff997
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/dbfff997
Branch: refs/heads/two-dot-o-dev
Commit: dbfff9970f844f2264113fc05ee290aa3ca0ae50
Parents: 30e0e45
Author: Todd Nine <tn...@apigee.com>
Authored: Mon May 11 09:27:28 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 10:29:41 2015 -0700
----------------------------------------------------------------------
.../corepersistence/CpEntityManagerFactory.java | 14 ------
.../corepersistence/index/ReIndexAction.java | 4 ++
.../corepersistence/index/ReIndexService.java | 11 ++--
.../index/ReIndexServiceImpl.java | 5 +-
.../persistence/EntityManagerFactory.java | 2 -
.../PerformanceEntityRebuildIndexTest.java | 53 ++++++--------------
6 files changed, 27 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
index e796545..13b6433 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
@@ -687,20 +687,6 @@ public class CpEntityManagerFactory implements EntityManagerFactory, Application
}
}
- @Override
- public ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> appId, Optional<String> collection ) {
- throw new UnsupportedOperationException( "Implement me" );
-//
-// EntityManager em = getEntityManager( appId );
-//
-// //explicitly invoke create index, we don't know if it exists or not in ES during a rebuild.
-// Application app = em.getApplication();
-//
-// em.reindexCollection(po, collectionName, reverse);
-//
-// logger.info("\n\nRebuilt index for application {} id {} collection {}\n",
-// new Object[]{app.getName(), appId, collectionName});
- }
@Override
public void addIndex(final String indexSuffix,final int shards,final int replicas, final String writeConsistency){
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index 086b2aa..b878246 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -29,5 +29,9 @@ import org.apache.usergrid.persistence.collection.serialization.impl.migration.E
@FunctionalInterface
public interface ReIndexAction {
+ /**
+ * Index this entity with the specified scope
+ * @param entityIdScope
+ */
void index( final EntityIdScope entityIdScope );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index 91409fe..e594ad3 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -44,12 +44,15 @@ public interface ReIndexService {
/**
* Perform an index rebuild
- * @param appId
- * @param collection
+ *
+ * @param appId The applicationId to re-index, or all applications if absent
+ * @param collection The collection name to re-index. Otherwise all collections in an app will be used.
+ * @param cursor An optional cursor to resume processing
+ * @param startTimestamp The time to start indexing from. All edges >= this time will be indexed.
* @return
*/
- IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> collectionName, final Optional<String> cursor,
- final Optional<Long> startTimestamp );
+ IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
+ final Optional<Long> startTimestamp);
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index be5bcab..bd1bff9 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -84,8 +84,7 @@ public class ReIndexServiceImpl implements ReIndexService {
@Override
- public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection,
- final Optional<String> collectionName, final Optional<String> cursor,
+ public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
final Optional<Long> startTimestamp ) {
//load our last emitted Scope if a cursor is present
@@ -100,7 +99,7 @@ public class ReIndexServiceImpl implements ReIndexService {
//create an observable that loads each entity and indexes it, start it running with publish
final ConnectableObservable<EdgeScope> runningReIndex =
- allEntityIdsObservable.getEdgesToEntities( applicationScopes, collectionName, startTimestamp )
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
//for each edge, create our scope and index on it
.doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
index b3f4b62..cd7e515 100644
--- a/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/persistence/EntityManagerFactory.java
@@ -174,8 +174,6 @@ public interface EntityManagerFactory {
/** For testing purposes */
public void flushEntityManagerCaches();
- ReIndexService.IndexResponse rebuildCollectionIndex( Optional<UUID> appId, Optional<String> collection );
-
/**
* Add a new index to the application for scale
* @param suffix unique indentifier for additional index
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/dbfff997/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index df2a762..a17c925 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Optional;
import org.apache.commons.lang.RandomStringUtils;
+
+import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.junit.After;
import org.junit.Before;
@@ -63,9 +65,8 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
private static final MetricRegistry registry = new MetricRegistry();
private Slf4jReporter reporter;
- private static final long RUNTIME_MS = TimeUnit.SECONDS.toMillis( 10 );
+ private static final int ENTITIES_TO_INDEX = 2000;
- private static final long WRITE_DELAY_MS = 10;
@@ -99,6 +100,8 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
final EntityManager em = setup.getEmf().getEntityManager( appId );
+ final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
+
// ----------------- create a bunch of entities
Map<String, Object> entityMap = new HashMap<String, Object>() {{
@@ -107,37 +110,18 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
put("key3", "Some value");
}};
-// Map<String, Object> cat1map = new HashMap<String, Object>() {{
-// put("name", "enzo");
-// put("color", "orange");
-// }};
-// Map<String, Object> cat2map = new HashMap<String, Object>() {{
-// put("name", "marquee");
-// put("color", "grey");
-// }};
-// Map<String, Object> cat3map = new HashMap<String, Object>() {{
-// put("name", "bertha");
-// put("color", "tabby");
-// }};
-//
-// Entity cat1 = em.create("cat", cat1map );
-// Entity cat2 = em.create("cat", cat2map );
-// Entity cat3 = em.create("cat", cat3map );
-
- final long stopTime = System.currentTimeMillis() + RUNTIME_MS;
List<EntityRef> entityRefs = new ArrayList<EntityRef>();
- int entityCount = 0;
int herderCount = 0;
int shepardCount = 0;
- while ( System.currentTimeMillis() < stopTime ) {
+ for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
final Entity entity;
try {
- entityMap.put("key", entityCount );
+ entityMap.put("key", i );
- if ( entityCount % 2 == 0 ) {
+ if ( i % 2 == 0 ) {
entity = em.create("catherder", entityMap);
herderCount++;
} else {
@@ -156,15 +140,13 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
}
entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
- if ( entityCount % 10 == 0 ) {
- logger.info("Created {} entities", entityCount );
+ if ( i % 10 == 0 ) {
+ logger.info("Created {} entities", i );
}
- entityCount++;
- try { Thread.sleep( WRITE_DELAY_MS ); } catch (InterruptedException ignored ) {}
}
- logger.info("Created {} entities", entityCount);
+ logger.info("Created {} entities", ENTITIES_TO_INDEX);
app.refreshIndex();
// ----------------- test that we can read them, should work fine
@@ -182,7 +164,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
logger.debug("Reading data, should fail this time ");
try {
- readData( em, "testTypes", entityCount, 0 );
+ readData( em, "testTypes", ENTITIES_TO_INDEX, 0 );
fail("should have failed to read data");
} catch (Exception expected) {}
@@ -214,10 +196,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
try {
-// // do it forwards
- setup.getEmf().rebuildCollectionIndex(Optional.of(em.getApplicationId()), Optional.<String>of("catherders"));
-//
-// // and backwards, just to make sure both cases are covered
+ reIndexService.rebuildIndex( Optional.of( em.getApplicationId()), Optional.<String>of("catherders"), Optional.absent(), Optional.absent() );
reporter.report();
registry.remove( meterName );
@@ -269,11 +248,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
Entity cat2 = em.create("cat", cat2map );
Entity cat3 = em.create("cat", cat3map );
- final long stopTime = System.currentTimeMillis() + RUNTIME_MS;
-
List<EntityRef> entityRefs = new ArrayList<EntityRef>();
int entityCount = 0;
- while ( System.currentTimeMillis() < stopTime ) {
+ for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
final Entity entity;
@@ -295,8 +272,6 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
logger.info("Created {} entities", entityCount );
}
- entityCount++;
- try { Thread.sleep( WRITE_DELAY_MS ); } catch (InterruptedException ignored ) {}
}
logger.info("Created {} entities", entityCount);
[02/10] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
USERGRID-643
Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-643
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b1d9ac28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b1d9ac28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b1d9ac28
Branch: refs/heads/two-dot-o-dev
Commit: b1d9ac283d1bf8ab17a5d9fd305b38447b5799d1
Parents: dbfff99 bc3cafb
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 12 10:43:32 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 10:43:32 2015 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CpRelationManager.java | 4 ++--
.../java/org/apache/usergrid/persistence/Query.java | 13 ++++---------
.../usergrid/services/AbstractCollectionService.java | 4 ++--
.../org/apache/usergrid/services/ServiceRequest.java | 2 +-
4 files changed, 9 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
[07/10] incubator-usergrid git commit: Updates REST api calls. POST
is used to create a re-index job, PUT is used to resume.
Posted by gr...@apache.org.
Updates REST api calls. POST is used to create a re-index job, PUT is used to resume.
Also fixes transitive dependency issue with clouds
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/5a7f9c09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/5a7f9c09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/5a7f9c09
Branch: refs/heads/two-dot-o-dev
Commit: 5a7f9c096b5d79950b5df658070ae0bd9bdc77da
Parents: 48be894
Author: Todd Nine <tn...@apigee.com>
Authored: Fri May 15 19:24:17 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri May 15 19:24:17 2015 -0600
----------------------------------------------------------------------
.../index/IndexServiceRequestBuilder.java | 88 -----
.../index/IndexServiceRequestBuilderImpl.java | 122 -------
.../index/ReIndexRequestBuilder.java | 86 +++++
.../index/ReIndexRequestBuilderImpl.java | 122 +++++++
.../corepersistence/index/ReIndexService.java | 6 +-
.../index/ReIndexServiceImpl.java | 14 +-
.../PerformanceEntityRebuildIndexTest.java | 6 +-
.../persistence/index/impl/IndexingUtils.java | 2 +
stack/pom.xml | 13 +
.../org/apache/usergrid/rest/IndexResource.java | 342 +++++++++----------
.../main/resources/usergrid-rest-context.xml | 3 -
.../resources/usergrid-rest-deploy-context.xml | 1 -
12 files changed, 401 insertions(+), 404 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
deleted file mode 100644
index 07160d8..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.UUID;
-
-import org.elasticsearch.action.index.IndexRequestBuilder;
-
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-/**
- * A builder interface to build our re-index request
- */
-public interface IndexServiceRequestBuilder {
-
- /**
- * Set the application id
- */
- IndexServiceRequestBuilder withApplicationId( final UUID applicationId );
-
- /**
- * Set the collection name. If not set, every collection will be reindexed
- * @param collectionName
- * @return
- */
- IndexServiceRequestBuilder withCollection( final String collectionName );
-
- /**
- * Set our cursor to resume processing
- * @param cursor
- * @return
- */
- IndexServiceRequestBuilder withCursor(final String cursor);
-
-
- /**
- * Set the timestamp to re-index entities updated >= this timestamp
- * @param timestamp
- * @return
- */
- IndexServiceRequestBuilder withStartTimestamp(final Long timestamp);
-
-
- /**
- * Get the application scope
- * @return
- */
- Optional<ApplicationScope> getApplicationScope();
-
- /**
- * Get the collection name
- * @return
- */
- Optional<String> getCollectionName();
-
- /**
- * Get the cursor
- * @return
- */
- Optional<String> getCursor();
-
- /**
- * Get the updated since timestamp
- * @return
- */
- Optional<Long> getUpdateTimestamp();
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
deleted file mode 100644
index 4017b6e..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.index;
-
-
-import java.util.UUID;
-
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-
-/**
- * Index service request builder
- */
-public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
-
- private Optional<UUID> withApplicationId = Optional.absent();
- private Optional<String> withCollectionName = Optional.absent();
- private Optional<String> cursor = Optional.absent();
- private Optional<Long> updateTimestamp = Optional.absent();
-
-
- /***
- *
- * @param applicationId The application id
- * @return
- */
- @Override
- public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
- this.withApplicationId = Optional.fromNullable( applicationId );
- return this;
- }
-
-
- /**
- * the colleciton name
- * @param collectionName
- * @return
- */
- @Override
- public IndexServiceRequestBuilder withCollection( final String collectionName ) {
- if(collectionName == null){
- this.withCollectionName = Optional.absent();
- }
- else {
- this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
- }
- return this;
- }
-
-
- /**
- * The cursor
- * @param cursor
- * @return
- */
- @Override
- public IndexServiceRequestBuilder withCursor( final String cursor ) {
- this.cursor = Optional.fromNullable( cursor );
- return this;
- }
-
-
- /**
- * Set start timestamp in epoch time. Only entities updated since this time will be processed for indexing
- * @param timestamp
- * @return
- */
- @Override
- public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
- this.updateTimestamp = Optional.fromNullable( timestamp );
- return this;
- }
-
-
- @Override
- public Optional<ApplicationScope> getApplicationScope() {
-
- if ( this.withApplicationId.isPresent() ) {
- return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
- }
-
- return Optional.absent();
- }
-
-
- @Override
- public Optional<String> getCollectionName() {
- return withCollectionName;
- }
-
-
- @Override
- public Optional<String> getCursor() {
- return cursor;
- }
-
-
- @Override
- public Optional<Long> getUpdateTimestamp() {
- return updateTimestamp;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
new file mode 100644
index 0000000..0863a63
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface ReIndexRequestBuilder {
+
+ /**
+ * Set the application id
+ */
+ ReIndexRequestBuilder withApplicationId( final UUID applicationId );
+
+ /**
+ * Set the collection name. If not set, every collection will be reindexed
+ * @param collectionName
+ * @return
+ */
+ ReIndexRequestBuilder withCollection( final String collectionName );
+
+ /**
+ * Set our cursor to resume processing
+ * @param cursor
+ * @return
+ */
+ ReIndexRequestBuilder withCursor(final String cursor);
+
+
+ /**
+ * Set the timestamp to re-index entities updated >= this timestamp
+ * @param timestamp
+ * @return
+ */
+ ReIndexRequestBuilder withStartTimestamp(final Long timestamp);
+
+
+ /**
+ * Get the application scope
+ * @return
+ */
+ Optional<ApplicationScope> getApplicationScope();
+
+ /**
+ * Get the collection name
+ * @return
+ */
+ Optional<String> getCollectionName();
+
+ /**
+ * Get the cursor
+ * @return
+ */
+ Optional<String> getCursor();
+
+ /**
+ * Get the updated since timestamp
+ * @return
+ */
+ Optional<Long> getUpdateTimestamp();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
new file mode 100644
index 0000000..25e71e6
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexRequestBuilderImpl.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * Index service request builder
+ */
+public class ReIndexRequestBuilderImpl implements ReIndexRequestBuilder {
+
+ private Optional<UUID> withApplicationId = Optional.absent();
+ private Optional<String> withCollectionName = Optional.absent();
+ private Optional<String> cursor = Optional.absent();
+ private Optional<Long> updateTimestamp = Optional.absent();
+
+
+ /***
+ *
+ * @param applicationId The application id
+ * @return
+ */
+ @Override
+ public ReIndexRequestBuilder withApplicationId( final UUID applicationId ) {
+ this.withApplicationId = Optional.fromNullable( applicationId );
+ return this;
+ }
+
+
+ /**
+ * the colleciton name
+ * @param collectionName
+ * @return
+ */
+ @Override
+ public ReIndexRequestBuilder withCollection( final String collectionName ) {
+ if(collectionName == null){
+ this.withCollectionName = Optional.absent();
+ }
+ else {
+ this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
+ }
+ return this;
+ }
+
+
+ /**
+ * The cursor
+ * @param cursor
+ * @return
+ */
+ @Override
+ public ReIndexRequestBuilder withCursor( final String cursor ) {
+ this.cursor = Optional.fromNullable( cursor );
+ return this;
+ }
+
+
+ /**
+ * Set start timestamp in epoch time. Only entities updated since this time will be processed for indexing
+ * @param timestamp
+ * @return
+ */
+ @Override
+ public ReIndexRequestBuilder withStartTimestamp( final Long timestamp ) {
+ this.updateTimestamp = Optional.fromNullable( timestamp );
+ return this;
+ }
+
+
+ @Override
+ public Optional<ApplicationScope> getApplicationScope() {
+
+ if ( this.withApplicationId.isPresent() ) {
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
+ }
+
+ return Optional.absent();
+ }
+
+
+ @Override
+ public Optional<String> getCollectionName() {
+ return withCollectionName;
+ }
+
+
+ @Override
+ public Optional<String> getCursor() {
+ return cursor;
+ }
+
+
+ @Override
+ public Optional<Long> getUpdateTimestamp() {
+ return updateTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index af3615e..bae8d1f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -29,15 +29,15 @@ public interface ReIndexService {
/**
* Perform an index rebuild
*
- * @param indexServiceRequestBuilder The builder to build the request
+ * @param reIndexRequestBuilder The builder to build the request
*/
- ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
+ ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder );
/**
* Generate a build for the index
*/
- IndexServiceRequestBuilder getBuilder();
+ ReIndexRequestBuilder getBuilder();
/**
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index f44113b..ef03866 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -91,16 +91,16 @@ public class ReIndexServiceImpl implements ReIndexService {
@Override
- public ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
+ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBuilder ) {
//load our last emitted Scope if a cursor is present
- final Optional<EdgeScope> cursor = parseCursor( indexServiceRequestBuilder.getCursor() );
+ final Optional<EdgeScope> cursor = parseCursor( reIndexRequestBuilder.getCursor() );
final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
- final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
+ final Optional<ApplicationScope> appId = reIndexRequestBuilder.getApplicationScope();
Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
@@ -111,11 +111,11 @@ public class ReIndexServiceImpl implements ReIndexService {
final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
- final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
+ final long modifiedSince = reIndexRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
//create an observable that loads each entity and indexes it, start it running with publish
final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
- indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+ reIndexRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
//for each edge, create our scope and index on it
.doOnNext( edge -> {
@@ -143,8 +143,8 @@ public class ReIndexServiceImpl implements ReIndexService {
@Override
- public IndexServiceRequestBuilder getBuilder() {
- return new IndexServiceRequestBuilderImpl();
+ public ReIndexRequestBuilder getBuilder() {
+ return new ReIndexRequestBuilderImpl();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 8d54043..06eb060 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -33,7 +33,7 @@ import org.apache.commons.lang.RandomStringUtils;
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
-import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
@@ -153,7 +153,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
logger.debug( "Preparing to rebuild all indexes" );
- final IndexServiceRequestBuilder builder =
+ final ReIndexRequestBuilder builder =
reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
@@ -270,7 +270,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
try {
- final IndexServiceRequestBuilder builder =
+ final ReIndexRequestBuilder builder =
reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
index 8b248aa..bc15149 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/IndexingUtils.java
@@ -99,6 +99,8 @@ public class IndexingUtils {
/**
* Create our sub scope. This is the ownerUUID + type
+ *
+ * TODO make this format more readable and parsable
*/
public static String createContextName( final ApplicationScope applicationScope, final SearchEdge scope ) {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/pom.xml
----------------------------------------------------------------------
diff --git a/stack/pom.xml b/stack/pom.xml
index 762607d..4559db2 100644
--- a/stack/pom.xml
+++ b/stack/pom.xml
@@ -1241,6 +1241,13 @@
<groupId>org.apache.jclouds</groupId>
<artifactId>jclouds-blobstore</artifactId>
<version>${jclouds.version}</version>
+ <exclusions>
+ <!-- blows up our version of guice-->
+ <exclusion>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-assistedinject</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -1262,6 +1269,12 @@
<groupId>aopalliance</groupId>
<artifactId>aopalliance</artifactId>
</exclusion>
+
+ <!-- blows up our version of guice-->
+ <exclusion>
+ <groupId>com.google.inject.extensions</groupId>
+ <artifactId>guice-assistedinject</artifactId>
+ </exclusion>
</exclusions>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
index 668c05c..ffb9700 100644
--- a/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
+++ b/stack/rest/src/main/java/org/apache/usergrid/rest/IndexResource.java
@@ -20,25 +20,34 @@
package org.apache.usergrid.rest;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.sun.jersey.api.json.JSONWithPadding;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.persistence.EntityRef;
-import org.apache.usergrid.persistence.index.utils.UUIDUtils;
-import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+
+import java.util.Map;
+import java.util.UUID;
+
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.UriInfo;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
-import javax.ws.rs.*;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.UriInfo;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.persistence.index.utils.UUIDUtils;
+import org.apache.usergrid.rest.security.annotations.RequireSystemAccess;
+
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.json.JSONWithPadding;
+
/**
* Classy class class.
@@ -46,260 +55,239 @@ import java.util.UUID;
@Component
@Scope( "singleton" )
@Produces( {
- MediaType.APPLICATION_JSON, "application/javascript", "application/x-javascript", "text/ecmascript",
- "application/ecmascript", "text/jscript"
+ MediaType.APPLICATION_JSON, "application/javascript", "application/x-javascript", "text/ecmascript",
+ "application/ecmascript", "text/jscript"
} )
public class IndexResource extends AbstractContextResource {
- private static final Logger logger = LoggerFactory.getLogger(IndexResource.class);
+ private static final Logger logger = LoggerFactory.getLogger( IndexResource.class );
+ private static final String UPDATED_FIELD = "updated";
- public IndexResource(){
+
+ public IndexResource() {
super();
}
+
@RequireSystemAccess
- @PUT
+ @POST
@Path( "rebuild" )
- public JSONWithPadding rebuildIndexes( @Context UriInfo ui,
- @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
- throws Exception {
+ public JSONWithPadding rebuildIndexesPost( @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ throws Exception {
- ApiResponse response = createApiResponse();
- response.setAction( "rebuild indexes" );
+ logger.info( "Rebuilding all applications" );
- final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ final ReIndexRequestBuilder request = createRequest();
+ return executeAndCreateResponse( request, callback );
+ }
- @Override
- public void onProgress( final EntityRef entity ) {
- logger.info( "Indexing entity {}:{} ", entity.getType(), entity.getUuid() );
- }
- };
+ @RequireSystemAccess
+ @PUT
+ @Path( "rebuild" )
+ public JSONWithPadding rebuildIndexesPut( final Map<String, Object> payload,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ throws Exception {
- final Thread rebuild = new Thread() {
+ logger.info( "Resuming rebuilding all applications" );
+ final ReIndexRequestBuilder request = createRequest();
- @Override
- public void run() {
- logger.info( "Rebuilding all indexes" );
+ return executeResumeAndCreateResponse( payload, request, callback );
+ }
- try {
- emf.rebuildAllIndexes( po );
- }
- catch ( Exception e ) {
- logger.error( "Unable to rebuild indexes", e );
- }
- logger.info( "Completed all indexes" );
- }
- };
+ @RequireSystemAccess
+ @POST
+ @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH )
+ public JSONWithPadding rebuildIndexesPut( @PathParam( "applicationId" ) String applicationIdStr,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+ @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
- rebuild.setName( "Index rebuild all usergrid" );
- rebuild.setDaemon( true );
- rebuild.start();
+ throws Exception {
- response.setSuccess();
+ logger.info( "Rebuilding application {}", applicationIdStr );
- return new JSONWithPadding( response, callback );
+
+ final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
+
+ final ReIndexRequestBuilder request = createRequest().withApplicationId( appId );
+
+ return executeAndCreateResponse( request, callback );
}
@RequireSystemAccess
@PUT
@Path( "rebuild/" + RootResource.APPLICATION_ID_PATH )
- public JSONWithPadding rebuildIndexes( @Context UriInfo ui, @PathParam( "applicationId" ) String applicationIdStr,
- @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
- @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
-
- throws Exception {
-
- final UUID appId = UUIDUtils.tryExtractUUID(applicationIdStr);
- ApiResponse response = createApiResponse();
- response.setAction( "rebuild indexes started" );
+ public JSONWithPadding rebuildIndexesPut( final Map<String, Object> payload,
+ @PathParam( "applicationId" ) String applicationIdStr,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
+ @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay )
- final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ throws Exception {
- @Override
- public void onProgress( final EntityRef entity ) {
- logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
- }
+ logger.info( "Resuming rebuilding application {}", applicationIdStr );
+ final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
- };
-
-
- final Thread rebuild = new Thread() {
-
- @Override
- public void run() {
-
+ final ReIndexRequestBuilder request = createRequest().withApplicationId( appId );
- logger.info( "Started rebuilding application {} in collection ", appId );
+ return executeResumeAndCreateResponse( payload, request, callback );
+ }
- try {
- emf.rebuildApplicationIndexes( appId, po );
- }
- catch ( Exception e ) {
- logger.error( "Unable to re-index application", e );
- }
+ @RequireSystemAccess
+ @POST
+ @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
+ public JSONWithPadding rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr,
+ @PathParam( "collectionName" ) final String collectionName,
+ @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ throws Exception {
- logger.info( "Completed rebuilding application {} in collection ", appId );
- }
- };
+ logger.info( "Rebuilding collection {} in application {}", collectionName, applicationIdStr );
- rebuild.setName( String.format( "Index rebuild for app %s", appId ) );
- rebuild.setDaemon( true );
- rebuild.start();
+ final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
- response.setSuccess();
+ final ReIndexRequestBuilder request =
+ createRequest().withApplicationId( appId ).withCollection( collectionName );
- return new JSONWithPadding( response, callback );
+ return executeAndCreateResponse( request, callback );
}
@RequireSystemAccess
@PUT
@Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" )
- public JSONWithPadding rebuildIndexes(
- @Context UriInfo ui,
- @PathParam( "applicationId" ) final String applicationIdStr,
- @PathParam( "collectionName" ) final String collectionName,
- @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
- @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback) throws Exception {
+ public JSONWithPadding rebuildIndexesPut( final Map<String, Object> payload,
+ @PathParam( "applicationId" ) final String applicationIdStr,
+ @PathParam( "collectionName" ) final String collectionName,
+ @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ throws Exception {
- final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
- ApiResponse response = createApiResponse();
- response.setAction( "rebuild indexes" );
+ logger.info( "Resuming rebuilding collection {} in application {}", collectionName, applicationIdStr );
- final Thread rebuild = new Thread() {
+ final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr );
- public void run() {
+ final ReIndexRequestBuilder request =
+ createRequest().withApplicationId( appId ).withCollection( collectionName );
- logger.info( "Started rebuilding application {} in collection {}", appId, collectionName );
+ return executeResumeAndCreateResponse( payload, request, callback );
+ }
- try {
- rebuildCollection( appId, collectionName, reverse );
- } catch (Exception e) {
- // TODO: handle this in rebuildCollection() instead
- throw new RuntimeException("Error rebuilding collection");
- }
+ @RequireSystemAccess
+ @POST
+ @Path( "rebuild/management" )
+ public JSONWithPadding rebuildInternalIndexesPost(
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) throws Exception {
- logger.info( "Completed rebuilding application {} in collection {}", appId, collectionName );
- }
- };
- rebuild.setName( String.format( "Index rebuild for app %s and collection %s", appId, collectionName ) );
- rebuild.setDaemon( true );
- rebuild.start();
+ final UUID managementAppId = emf.getManagementAppId();
- response.setSuccess();
+ logger.info( "Rebuilding management application with id {} ", managementAppId );
+ final ReIndexRequestBuilder request = createRequest().withApplicationId( managementAppId );
- return new JSONWithPadding( response, callback );
+ return executeAndCreateResponse( request, callback );
}
+
@RequireSystemAccess
- @PUT
- @Path( "rebuildinternal" )
- public JSONWithPadding rebuildInternalIndexes(
- @Context UriInfo ui,
- @PathParam( "applicationId" ) String applicationIdStr,
- @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback,
- @QueryParam( "delay" ) @DefaultValue( "10" ) final long delay ) throws Exception {
+ @POST
+ @Path( "rebuild/management" )
+ public JSONWithPadding rebuildInternalIndexesPut( final Map<String, Object> payload,
+ @QueryParam( "callback" ) @DefaultValue( "callback" )
+ String callback ) throws Exception {
- final UUID appId = UUIDUtils.tryExtractUUID(applicationIdStr);
- ApiResponse response = createApiResponse();
- response.setAction( "rebuild indexes started" );
+ final UUID managementAppId = emf.getManagementAppId();
- final EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ logger.info( "Resuming rebuilding management application with id {} ", managementAppId );
+ final ReIndexRequestBuilder request = createRequest().withApplicationId( managementAppId );
- @Override
- public void onProgress( final EntityRef entity ) {
- logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
- }
+ return executeResumeAndCreateResponse( payload, request, callback );
+ }
- };
- final Thread rebuild = new Thread() {
+ @RequireSystemAccess
+ @POST
+ public JSONWithPadding addIndex( @Context UriInfo ui, Map<String, Object> config,
+ @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback )
+ throws Exception {
- @Override
- public void run() {
+ Preconditions
+ .checkNotNull( config, "Payload for config is null, please pass {replicas:int, shards:int} in body" );
- logger.info( "Started rebuilding internal indexes", appId );
+ ApiResponse response = createApiResponse();
- try {
- emf.rebuildInternalIndexes( po );
- }
- catch ( Exception e ) {
- logger.error( "Unable to re-index internals", e );
- }
+ if ( !config.containsKey( "replicas" ) || !config.containsKey( "shards" ) ||
+ !( config.get( "replicas" ) instanceof Integer ) || !( config.get( "shards" ) instanceof Integer ) ) {
+ throw new IllegalArgumentException( "body must contains 'replicas' of type int and 'shards' of type int" );
+ }
- logger.info( "Completed rebuilding internal indexes" );
- }
- };
+ if ( !config.containsKey( "indexSuffix" ) ) {
+ throw new IllegalArgumentException( "Please add an indexSuffix to your post" );
+ }
- rebuild.setName( String.format( "Index rebuild for app %s", appId ) );
- rebuild.setDaemon( true );
- rebuild.start();
- response.setSuccess();
+ emf.addIndex( config.get( "indexSuffix" ).toString(), ( int ) config.get( "shards" ),
+ ( int ) config.get( "replicas" ), ( String ) config.get( "writeConsistency" ) );
+ response.setAction( "Add index to alias" );
return new JSONWithPadding( response, callback );
}
- @RequireSystemAccess
- @POST
- @Path( RootResource.APPLICATION_ID_PATH )
- public JSONWithPadding addIndex(@Context UriInfo ui,
- Map<String, Object> config,
- @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback) throws Exception{
- Preconditions.checkNotNull(config,"Payload for config is null, please pass {replicas:int, shards:int} in body");
+ private ReIndexService getReIndexService() {
+ return injector.getInstance( ReIndexService.class );
+ }
- ApiResponse response = createApiResponse();
- if (!config.containsKey("replicas") || !config.containsKey("shards") ||
- !(config.get("replicas") instanceof Integer) || !(config.get("shards") instanceof Integer)){
- throw new IllegalArgumentException("body must contains 'replicas' of type int and 'shards' of type int");
- }
+ private ReIndexRequestBuilder createRequest() {
+ return createRequest();
+ }
- if(!config.containsKey("indexSuffix")) {
- throw new IllegalArgumentException("Please add an indexSuffix to your post");
- }
+ private JSONWithPadding executeResumeAndCreateResponse( final Map<String, Object> payload,
+ final ReIndexRequestBuilder request,
+ final String callback ) {
- emf.addIndex( config.get("indexSuffix").toString(),
- (int) config.get("shards"),(int) config.get("replicas"),(String)config.get("writeConsistency"));
- response.setAction("Add index to alias");
+ Preconditions.checkArgument( payload.containsKey( UPDATED_FIELD ),
+ "You must specified the field \"updated\" in the payload" );
- return new JSONWithPadding(response, callback);
+ //add our updated timestamp to the request
+ if ( !payload.containsKey( UPDATED_FIELD ) ) {
+ final long timestamp = ( long ) payload.get( UPDATED_FIELD );
+ request.withStartTimestamp( timestamp );
+ }
+ return executeAndCreateResponse( request, callback );
}
- private void rebuildCollection(
- final UUID applicationId,
- final String collectionName,
- final boolean reverse) throws Exception {
- EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
+ /**
+ * Execute the request and return the response.
+ */
+ private JSONWithPadding executeAndCreateResponse( final ReIndexRequestBuilder request, final String callback ) {
- @Override
- public void onProgress( final EntityRef entity ) {
- logger.info( "Indexing entity {}:{}", entity.getType(), entity.getUuid() );
- }
- };
+ final ReIndexService.ReIndexStatus status = getReIndexService().rebuildIndex( request );
- logger.info( "Reindexing for app id: {} and collection {}", applicationId, collectionName );
+ final ApiResponse response = createApiResponse();
- emf.rebuildCollectionIndex(Optional.of(applicationId),Optional.of(collectionName));
- getEntityIndex().refreshAsync().toBlocking().first();
- }
+ response.setAction( "rebuild indexes" );
+ response.setProperty( "jobId", status.getJobId() );
+ response.setProperty( "status", status.getStatus() );
+ response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() );
+ response.setProperty( "numberQueued", status.getNumberProcessed() );
+ response.setSuccess();
+ return new JSONWithPadding( response, callback );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/rest/src/main/resources/usergrid-rest-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/resources/usergrid-rest-context.xml b/stack/rest/src/main/resources/usergrid-rest-context.xml
index 5c63e72..16f4541 100644
--- a/stack/rest/src/main/resources/usergrid-rest-context.xml
+++ b/stack/rest/src/main/resources/usergrid-rest-context.xml
@@ -47,9 +47,6 @@
<property name="securityManager" ref="securityManager" />
</bean>
- <bean id="mongoServer" class="org.apache.usergrid.mongo.MongoServer"
- init-method="startServer" destroy-method="stopServer" />
-
<!-- override the security manager -->
<bean id="securityManager" class="org.apache.usergrid.rest.security.shiro.RestSecurityManager">
<property name="cacheManager" ref="cacheManager" />
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/5a7f9c09/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
----------------------------------------------------------------------
diff --git a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
index 9965dbc..e9b7ccd 100644
--- a/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
+++ b/stack/rest/src/main/resources/usergrid-rest-deploy-context.xml
@@ -33,7 +33,6 @@
<property name="locations">
<list>
<value>classpath:/usergrid-default.properties</value>
- <value>${usergrid-custom-spring-properties}</value>
</list>
</property>
</bean>
[09/10] incubator-usergrid git commit: Addresses some of George's
comments
Posted by gr...@apache.org.
Addresses some of George's comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/53aa87ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/53aa87ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/53aa87ca
Branch: refs/heads/two-dot-o-dev
Commit: 53aa87cad41b6d7320c24af4a5e71061fe2a4dd3
Parents: 8fde7c5
Author: Todd Nine <tn...@apigee.com>
Authored: Wed May 20 15:04:09 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Wed May 20 15:04:09 2015 -0600
----------------------------------------------------------------------
.../usergrid/persistence/PerformanceEntityRebuildIndexTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/53aa87ca/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 06eb060..318a378 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -314,7 +314,7 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
}
}
catch ( IllegalArgumentException iae ) {
- //swallow
+ //swallow. Thrown if our job can't be found. I.E hasn't updated yet
}
[08/10] incubator-usergrid git commit: Fixes exception on missing
keyspace during bootstrap
Posted by gr...@apache.org.
Fixes exception on missing keyspace during bootstrap
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/8fde7c5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/8fde7c5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/8fde7c5d
Branch: refs/heads/two-dot-o-dev
Commit: 8fde7c5dfd7d06c17b0a2c2a3ac1053bf3d97e76
Parents: 5a7f9c0
Author: Todd Nine <tn...@apigee.com>
Authored: Fri May 15 19:38:19 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri May 15 19:38:19 2015 -0600
----------------------------------------------------------------------
.../data/MigrationInfoSerializationImpl.java | 4 +-
.../migration/schema/MigrationManagerImpl.java | 14 ++----
.../core/migration/util/AstayanxUtils.java | 49 ++++++++++++++++++++
3 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8fde7c5d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
index 4a349fd..3def798 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/data/MigrationInfoSerializationImpl.java
@@ -35,6 +35,7 @@ import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDef
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKey;
import org.apache.usergrid.persistence.core.astyanax.ScopedRowKeySerializer;
import org.apache.usergrid.persistence.core.astyanax.StringRowCompositeSerializer;
+import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
@@ -149,7 +150,8 @@ public class MigrationInfoSerializationImpl implements MigrationInfoSerializatio
return 0;
}
catch ( ConnectionException e ) {
- throw new DataMigrationException( "Unable to retrieve status", e );
+ AstayanxUtils.isKeyspaceMissing("Unable to connect to cassandra to retrieve status", e);
+ return 0;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8fde7c5d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
index 31aa1b3..26351cf 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/schema/MigrationManagerImpl.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.persistence.core.astyanax.MultiTennantColumnFamilyDefinition;
+import org.apache.usergrid.persistence.core.migration.util.AstayanxUtils;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
@@ -131,22 +132,15 @@ public class MigrationManagerImpl implements MigrationManager {
try {
keyspaceDefinition = keyspace.describeKeyspace();
- }
- catch ( BadRequestException badRequestException ) {
-
- //check if it's b/c the keyspace is missing, if so
- final String message = badRequestException.getMessage();
-
- boolean missingKeyspace = message.contains( "why:Keyspace" ) && message.contains( "does not exist" );
- if ( !missingKeyspace ) {
- throw badRequestException;
- }
}catch( NotFoundException nfe){
//if we execute this immediately after a drop keyspace in 1.2.x, Cassandra is returning the NFE instead of a BadRequestException
//swallow and log, then continue to create the keyspaces.
logger.info( "Received a NotFoundException when attempting to describe keyspace. It does not exist" );
}
+ catch(Exception e){
+ AstayanxUtils.isKeyspaceMissing("Unable to connect to cassandra", e);
+ }
if ( keyspaceDefinition != null ) {
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/8fde7c5d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
new file mode 100644
index 0000000..7ae4748
--- /dev/null
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/migration/util/AstayanxUtils.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.persistence.core.migration.util;
+
+
+import com.netflix.astyanax.connectionpool.exceptions.BadRequestException;
+
+
+public class AstayanxUtils {
+
+ /**
+ * Return true if the exception is an instance of a missing keysapce
+ * @param rethrowMessage The message to add to the exception if rethrown
+ * @param cassandraException The exception from cassandar
+ * @return
+ */
+ public static void isKeyspaceMissing(final String rethrowMessage, final Exception cassandraException ) {
+
+ if ( cassandraException instanceof BadRequestException ) {
+
+ //check if it's b/c the keyspace is missing, if so
+ final String message = cassandraException.getMessage();
+
+ //no op, just swallow
+ if(message.contains( "why:Keyspace" ) && message.contains( "does not exist" )){
+ return;
+ };
+ }
+
+ throw new RuntimeException( rethrowMessage, cassandraException );
+ }
+}
[10/10] incubator-usergrid git commit: Merge remote-tracking branch
'origin/USERGRID-643' into two-dot-o-dev
Posted by gr...@apache.org.
Merge remote-tracking branch 'origin/USERGRID-643' into two-dot-o-dev
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/b28aeee0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/b28aeee0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/b28aeee0
Branch: refs/heads/two-dot-o-dev
Commit: b28aeee0522a3ebef8e2d349676ea381baf3ee37
Parents: b637535 53aa87c
Author: GERey <gr...@apigee.com>
Authored: Wed May 20 16:00:15 2015 -0700
Committer: GERey <gr...@apigee.com>
Committed: Wed May 20 16:00:15 2015 -0700
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 4 +
.../corepersistence/CpEntityManagerFactory.java | 14 -
.../corepersistence/CpRelationManager.java | 1 -
.../asyncevents/EventBuilder.java | 6 +-
.../asyncevents/EventBuilderImpl.java | 18 +-
.../asyncevents/InMemoryAsyncEventService.java | 7 +-
.../asyncevents/SQSAsyncEventService.java | 9 +-
.../index/EdgeScopeSerializer.java | 41 +++
.../index/EntityIndexOperation.java | 46 +++
.../index/IndexProcessorFig.java | 6 +-
.../corepersistence/index/ReIndexAction.java | 6 +-
.../index/ReIndexRequestBuilder.java | 86 +++++
.../index/ReIndexRequestBuilderImpl.java | 122 +++++++
.../corepersistence/index/ReIndexService.java | 89 +++--
.../index/ReIndexServiceImpl.java | 251 ++++++++++++--
.../cursor/AbstractCursorSerializer.java | 2 +-
.../pipeline/cursor/CursorSerializerUtil.java | 54 ++-
.../pipeline/cursor/RequestCursor.java | 9 +-
.../pipeline/cursor/ResponseCursor.java | 49 +--
.../pipeline/read/AbstractPathFilter.java | 30 --
.../pipeline/read/CursorSeek.java | 53 +++
.../rx/impl/AllEntityIdsObservable.java | 5 +-
.../rx/impl/AllEntityIdsObservableImpl.java | 6 +-
.../util/SerializableMapper.java | 91 -----
.../persistence/EntityManagerFactory.java | 2 -
.../rx/EdgesToTargetObservableIT.java | 4 +-
.../PerformanceEntityRebuildIndexTest.java | 346 ++++++++-----------
.../data/MigrationInfoSerializationImpl.java | 4 +-
.../migration/schema/MigrationManagerImpl.java | 14 +-
.../core/migration/util/AstayanxUtils.java | 49 +++
.../graph/serialization/EdgesObservable.java | 24 +-
.../serialization/impl/EdgesObservableImpl.java | 10 +-
.../impl/TargetIdObservableImpl.java | 2 +-
.../impl/migration/EdgeDataMigrationImpl.java | 2 +-
.../persistence/index/impl/IndexingUtils.java | 2 +
stack/pom.xml | 13 +
.../org/apache/usergrid/rest/IndexResource.java | 342 +++++++++---------
.../main/resources/usergrid-rest-context.xml | 3 -
.../resources/usergrid-rest-deploy-context.xml | 1 -
39 files changed, 1138 insertions(+), 685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b28aeee0/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b28aeee0/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/b28aeee0/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
[04/10] incubator-usergrid git commit: Finishes changes before tests
Posted by gr...@apache.org.
Finishes changes before tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/20c9b350
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/20c9b350
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/20c9b350
Branch: refs/heads/two-dot-o-dev
Commit: 20c9b3509cf96a6ecab1a45a2c572fd6a041e00d
Parents: cb179d3
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 17:19:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 17:19:27 2015 -0600
----------------------------------------------------------------------
.../asyncevents/InMemoryAsyncEventService.java | 2 -
.../index/EdgeScopeSerializer.java | 41 ++++
.../index/IndexProcessorFig.java | 6 +-
.../corepersistence/index/ReIndexService.java | 75 +++---
.../index/ReIndexServiceImpl.java | 226 +++++++++++++++----
.../pipeline/cursor/CursorSerializerUtil.java | 54 ++++-
.../pipeline/cursor/RequestCursor.java | 9 +-
.../pipeline/cursor/ResponseCursor.java | 49 ++--
.../pipeline/read/AbstractPathFilter.java | 30 ---
.../pipeline/read/CursorSeek.java | 53 +++++
.../rx/impl/AllEntityIdsObservable.java | 4 +-
.../rx/impl/AllEntityIdsObservableImpl.java | 5 +-
.../PerformanceEntityRebuildIndexTest.java | 5 +-
.../graph/serialization/EdgesObservable.java | 21 +-
.../serialization/impl/EdgesObservableImpl.java | 4 +-
15 files changed, 422 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 96966bf..ddcf826 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -99,8 +99,6 @@ public class InMemoryAsyncEventService implements AsyncEventService {
@Override
public void index( final EntityIndexOperation entityIndexOperation ) {
-
-
run(eventBuilder.index( entityIndexOperation ));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
new file mode 100644
index 0000000..2a6a5ac
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EdgeScopeSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.corepersistence.pipeline.cursor.AbstractCursorSerializer;
+import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
+
+
+/**
+ * Serialize our edge scope for cursors
+ */
+public class EdgeScopeSerializer extends AbstractCursorSerializer<EdgeScope> {
+
+
+ public static final EdgeScopeSerializer INSTANCE = new EdgeScopeSerializer();
+
+ @Override
+ protected Class<EdgeScope> getType() {
+ return EdgeScope.class;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index fe9d3fd..8e835e2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,9 +78,9 @@ public interface IndexProcessorFig extends GuicyFig {
String getQueueImplementation();
- @Default("30000")
- @Key("elasticsearch.reindex.sample.interval")
- long getReIndexSampleInterval();
+ @Default("10000")
+ @Key("elasticsearch.reindex.flush.interval")
+ int getUpdateInterval();
@Default("false")
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index b25eca5..f8955dd 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -20,22 +20,6 @@
package org.apache.usergrid.corepersistence.index;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-
-import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
-import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-
-import com.google.common.base.Optional;
-
-import rx.Observable;
-import rx.Observer;
-import rx.observables.ConnectableObservable;
-
-
/**
* An interface for re-indexing all entities in an application
*/
@@ -46,48 +30,75 @@ public interface ReIndexService {
* Perform an index rebuild
*
* @param indexServiceRequestBuilder The builder to build the request
- * @return
*/
- IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
+ IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
/**
* Generate a build for the index
- * @return
*/
IndexServiceRequestBuilder getBuilder();
+
+ /**
+ * Get the status of a job
+ * @param jobId The jobId returned during the rebuild index
+ * @return
+ */
+ IndexResponse getStatus( final String jobId );
+
+
/**
* The response when requesting a re-index operation
*/
class IndexResponse {
- final String cursor;
- final ConnectableObservable<EdgeScope> indexedEdgecount;
+ final String jobId;
+ final String status;
+ final long numberProcessed;
+ final long lastUpdated;
+
+
+ public IndexResponse( final String jobId, final String status, final long numberProcessed,
+ final long lastUpdated ) {
+ this.jobId = jobId;
+ this.status = status;
+ this.numberProcessed = numberProcessed;
+ this.lastUpdated = lastUpdated;
+ }
- public IndexResponse( final String cursor, final ConnectableObservable<EdgeScope> indexedEdgecount ) {
- this.cursor = cursor;
- this.indexedEdgecount = indexedEdgecount;
+ /**
+ * Get the jobId used to resume this operation
+ */
+ public String getJobId() {
+ return jobId;
+ }
+
+
+ /**
+ * Get the last updated time, as a long
+ * @return
+ */
+ public long getLastUpdated() {
+ return lastUpdated;
}
/**
- * Get the cursor used to resume this operation
+ * Get the number of records processed
* @return
*/
- public String getCursor() {
- return cursor;
+ public long getNumberProcessed() {
+ return numberProcessed;
}
/**
- * Return the observable of all edges to be indexed.
- *
- * Note that after subscribing "connect" will need to be called to ensure that processing begins
+ * Get the status
* @return
*/
- public ConnectableObservable<EdgeScope> getCount() {
- return indexedEdgecount;
+ public String getStatus() {
+ return status;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index a2fa09a..d828fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,29 +20,32 @@
package org.apache.usergrid.corepersistence.index;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
+import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
+import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.util.StringUtils;
+import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.map.MapManager;
import org.apache.usergrid.persistence.map.MapManagerFactory;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.util.UUIDGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import rx.Observable;
-import rx.observables.ConnectableObservable;
+import rx.schedulers.Schedulers;
@Singleton
@@ -51,14 +54,18 @@ public class ReIndexServiceImpl implements ReIndexService {
private static final MapScope RESUME_MAP_SCOPE =
new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
- //Keep cursors to resume re-index for 1 day. This is far beyond it's useful real world implications anyway.
+ //Keep cursors to resume re-index for 10 days. This is far beyond it's useful real world implications anyway.
private static final int INDEX_TTL = 60 * 60 * 24 * 10;
+ private static final String MAP_CURSOR_KEY = "cursor";
+ private static final String MAP_COUNT_KEY = "count";
+ private static final String MAP_STATUS_KEY = "status";
+ private static final String MAP_UPDATED_KEY = "lastUpdated";
+
private final AllApplicationsObservable allApplicationsObservable;
private final AllEntityIdsObservable allEntityIdsObservable;
private final IndexProcessorFig indexProcessorFig;
- private final RxTaskScheduler rxTaskScheduler;
private final MapManager mapManager;
private final AsyncEventService indexService;
@@ -66,69 +73,61 @@ public class ReIndexServiceImpl implements ReIndexService {
@Inject
public ReIndexServiceImpl( final AllEntityIdsObservable allEntityIdsObservable,
final MapManagerFactory mapManagerFactory,
- final AllApplicationsObservable allApplicationsObservable, final IndexProcessorFig indexProcessorFig,
- final RxTaskScheduler rxTaskScheduler, final AsyncEventService indexService ) {
+ final AllApplicationsObservable allApplicationsObservable,
+ final IndexProcessorFig indexProcessorFig, final AsyncEventService indexService ) {
this.allEntityIdsObservable = allEntityIdsObservable;
this.allApplicationsObservable = allApplicationsObservable;
this.indexProcessorFig = indexProcessorFig;
- this.rxTaskScheduler = rxTaskScheduler;
this.indexService = indexService;
this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
}
-
-
-
@Override
public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
- //load our last emitted Scope if a cursor is present
- if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
- throw new UnsupportedOperationException( "Build this" );
- }
+ //load our last emitted Scope if a cursor is present
+ final Optional<EdgeScope> cursor = parseCursor( indexServiceRequestBuilder.getCursor() );
+
+
+ final CursorSeek<Edge> cursorSeek = getResumeEdge( cursor );
final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
- final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData();
+ Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+ "You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" );
+
+ final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId );
- final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+ final String jobId = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
//create an observable that loads each entity and indexes it, start it running with publish
- final ConnectableObservable<EdgeScope> runningReIndex =
- allEntityIdsObservable.getEdgesToEntities( applicationScopes,
- indexServiceRequestBuilder.getCollectionName() )
-
- //for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index(
- new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
- modifiedSince ) ) ).publish();
+ final Observable<EdgeScope> runningReIndex = allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
+ //for each edge, create our scope and index on it
+ .doOnNext( edge -> indexService.index(
+ new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+ modifiedSince ) ) );
//start our sampler and state persistence
//take a sample every sample interval to allow us to resume state with minimal loss
- runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
- rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( edge -> {
-
-// final String serializedState = SerializableMapper.asString( edge );
-//
-// mapManager.putString( newCursor, serializedState, INDEX_TTL );
- } ).subscribe();
+ runningReIndex.buffer( indexProcessorFig.getUpdateInterval() )
+ //create our flushing collector and flush the edge scopes to it
+ .collect( () -> new FlushingCollector( jobId ),
+ ( ( flushingCollector, edgeScopes ) -> flushingCollector.flushBuffer( edgeScopes ) ) ).doOnNext( flushingCollector-> flushingCollector.complete() )
+ //subscribe on our I/O scheduler and run the task
+ .subscribeOn( Schedulers.io() ).subscribe();
- //start pushing to both
- runningReIndex.connect();
-
-
- return new IndexResponse( newCursor, runningReIndex );
+ return new IndexResponse( jobId, "Started", 0, 0 );
}
@@ -136,6 +135,155 @@ public class ReIndexServiceImpl implements ReIndexService {
public IndexServiceRequestBuilder getBuilder() {
return new IndexServiceRequestBuilderImpl();
}
+
+
+ @Override
+ public IndexResponse getStatus( final String jobId ) {
+ Preconditions.checkNotNull( jobId, "jobId must not be null" );
+ return getIndexResponse( jobId );
+ }
+
+
+ /**
+ * Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete
+ */
+ private class FlushingCollector {
+
+ private final String jobId;
+ private long count;
+
+
+ private FlushingCollector( final String jobId ) {
+ this.jobId = jobId;
+ }
+
+
+ public void flushBuffer( final List<EdgeScope> buffer ) {
+ count += buffer.size();
+
+ //write our cursor state
+ if ( buffer.size() > 0 ) {
+ writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
+ }
+
+ writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+ }
+
+ public void complete(){
+ writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+ }
+ }
+
+
+ /**
+ * Get the resume edge scope
+ *
+ * @param edgeScope The optional edge scope from the cursor
+ */
+ private CursorSeek<Edge> getResumeEdge( final Optional<EdgeScope> edgeScope ) {
+
+
+ if ( edgeScope.isPresent() ) {
+ return new CursorSeek<>( Optional.of( edgeScope.get().getEdge() ) );
+ }
+
+ return new CursorSeek<>( Optional.absent() );
+ }
+
+
+ /**
+ * Generate an observable for our appliation scope
+ */
+ private Observable<ApplicationScope> getApplications( final Optional<EdgeScope> cursor,
+ final Optional<ApplicationScope> appId ) {
+ //cursor is present use it and skip until we hit that app
+ if ( cursor.isPresent() ) {
+
+ final EdgeScope cursorValue = cursor.get();
+ //we have a cursor and an application scope that was used.
+ return allApplicationsObservable.getData().skipWhile(
+ applicationScope -> !cursorValue.getApplicationScope().equals( applicationScope ) );
+ }
+ //this is intentional. If
+ else if ( appId.isPresent() ) {
+ return Observable.just( appId.get() );
+ }
+
+ return allApplicationsObservable.getData();
+ }
+
+
+ /**
+ * Swap our cursor for an optional edgescope
+ */
+ private Optional<EdgeScope> parseCursor( final Optional<String> cursor ) {
+
+ if ( !cursor.isPresent() ) {
+ return Optional.absent();
+ }
+
+ //get our cursor
+ final String persistedCursor = mapManager.getString( cursor.get() );
+
+ if ( persistedCursor == null ) {
+ return Optional.absent();
+ }
+
+ final JsonNode node = CursorSerializerUtil.fromString( persistedCursor );
+
+ final EdgeScope edgeScope = EdgeScopeSerializer.INSTANCE.fromJsonNode( node, CursorSerializerUtil.getMapper() );
+
+ return Optional.of( edgeScope );
+ }
+
+
+ /**
+ * Write the cursor state to the map in cassandra
+ */
+ private void writeCursorState( final String jobId, final EdgeScope edge ) {
+
+ final JsonNode node = EdgeScopeSerializer.INSTANCE.toNode( CursorSerializerUtil.getMapper(), edge );
+
+ final String serializedState = CursorSerializerUtil.asString( node );
+
+ mapManager.putString( jobId + MAP_CURSOR_KEY, serializedState, INDEX_TTL );
+ }
+
+
+ /**
+ * Write our state meta data into cassandra so everyone can see it
+ * @param jobId
+ * @param status
+ * @param processedCount
+ * @param lastUpdated
+ */
+ private void writeStateMeta( final String jobId, final String status, final long processedCount,
+ final long lastUpdated ) {
+
+ mapManager.putString( jobId + MAP_STATUS_KEY, status );
+ mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
+ mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
+ }
+
+
+ /**
+ * Get the index response from the jobId
+ * @param jobId
+ * @return
+ */
+ private IndexResponse getIndexResponse( final String jobId ) {
+
+ final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+
+ if(status == null){
+ throw new IllegalArgumentException( "Could not find a job with id " + jobId );
+ }
+
+ final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
+ final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
+
+ return new IndexResponse( jobId, status, processedCount, lastUpdated );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
index fea0364..7acdd00 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/CursorSerializerUtil.java
@@ -20,10 +20,15 @@
package org.apache.usergrid.corepersistence.pipeline.cursor;
-import com.fasterxml.jackson.core.Base64Variant;
-import com.fasterxml.jackson.core.Base64Variants;
+import java.io.IOException;
+import java.util.Base64;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
+import com.google.common.base.Preconditions;
/**
@@ -35,9 +40,54 @@ public class CursorSerializerUtil {
private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
+ /**
+ * Aritrary number, just meant to keep us from having a DOS issue
+ */
+ private static final int MAX_SIZE = 1024;
+
public static ObjectMapper getMapper() {
return MAPPER;
}
+
+ /**
+ * Turn the json node in to a base64 encoded SMILE binary
+ */
+ public static String asString( final JsonNode node ) {
+ final byte[] output;
+ try {
+ output = MAPPER.writeValueAsBytes( node );
+ }
+ catch ( JsonProcessingException e ) {
+ throw new RuntimeException( "Unable to create output from json node " + node );
+ }
+
+ //generate a base64 url save string
+ final String value = Base64.getUrlEncoder().encodeToString( output );
+
+ return value;
+ }
+
+
+ /**
+ * Parse the base64 encoded binary string
+ */
+ public static JsonNode fromString( final String base64EncodedJson ) {
+
+ Preconditions.checkArgument( base64EncodedJson.length() <= MAX_SIZE,
+ "Your cursor must be less than " + MAX_SIZE + " chars in length" );
+
+ final byte[] data = Base64.getUrlDecoder().decode( base64EncodedJson );
+
+ JsonNode jsonNode;
+ try {
+ jsonNode = MAPPER.readTree( data );
+ }
+ catch ( IOException e ) {
+ throw new RuntimeException( "Unable to parse json node from string " + base64EncodedJson );
+ }
+
+ return jsonNode;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
index 870edbb..dc6ae71 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/RequestCursor.java
@@ -37,10 +37,6 @@ import com.google.common.base.Preconditions;
*/
public class RequestCursor {
- /**
- * Aritrary number, just meant to keep us from having a DOS issue
- */
- private static final int MAX_SIZE = 1024;
private static final int MAX_CURSOR_COUNT = 100;
@@ -83,11 +79,8 @@ public class RequestCursor {
try {
- Preconditions.checkArgument( cursor.length() <= MAX_SIZE, "Your cursor must be less than " + MAX_SIZE + " chars in length");
-
- final byte[] data = Base64.getUrlDecoder().decode( cursor );
- JsonNode jsonNode = MAPPER.readTree( data );
+ JsonNode jsonNode = CursorSerializerUtil.fromString( cursor );
Preconditions
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
index dbd8b88..dc4bf39 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/ResponseCursor.java
@@ -20,11 +20,8 @@
package org.apache.usergrid.corepersistence.pipeline.cursor;
-import java.util.Base64;
-
import org.apache.usergrid.corepersistence.pipeline.read.EdgePath;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -53,8 +50,8 @@ public class ResponseCursor {
/**
- * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to take the
- * time to calculate it
+ * Lazyily encoded deliberately. If the user doesn't care about a cursor and is using streams, we dont' want to
+ * take the time to calculate it
*/
public Optional<String> encodeAsString() {
@@ -68,42 +65,34 @@ public class ResponseCursor {
return encodedValue;
}
+ //no edge path, short circuit
- try {
-
- //no edge path, short circuit
-
- final ObjectNode map = MAPPER.createObjectNode();
+ final ObjectNode map = MAPPER.createObjectNode();
- Optional<EdgePath> current = edgePath;
+ Optional<EdgePath> current = edgePath;
- //traverse each edge and add them to our json
- do {
+ //traverse each edge and add them to our json
+ do {
- final EdgePath edgePath = current.get();
- final Object cursorValue = edgePath.getCursorValue();
- final CursorSerializer serializer = edgePath.getSerializer();
- final int filterId = edgePath.getFilterId();
+ final EdgePath edgePath = current.get();
+ final Object cursorValue = edgePath.getCursorValue();
+ final CursorSerializer serializer = edgePath.getSerializer();
+ final int filterId = edgePath.getFilterId();
- final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
- map.put( String.valueOf( filterId ), serialized );
+ final JsonNode serialized = serializer.toNode( MAPPER, cursorValue );
+ map.put( String.valueOf( filterId ), serialized );
- current = current.get().getPrevious();
- }
- while ( current.isPresent() );
+ current = current.get().getPrevious();
+ }
+ while ( current.isPresent() );
- final byte[] output = MAPPER.writeValueAsBytes( map );
+ //generate a base64 url save string
+ final String value = CursorSerializerUtil.asString( map );
- //generate a base64 url save string
- final String value = Base64.getUrlEncoder().encodeToString( output );
+ encodedValue = Optional.of( value );
- encodedValue = Optional.of( value );
- }
- catch ( JsonProcessingException e ) {
- throw new CursorParseException( "Unable to serialize cursor", e );
- }
return encodedValue;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
index c68dc4a..0f9ac9b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/AbstractPathFilter.java
@@ -76,34 +76,4 @@ public abstract class AbstractPathFilter<T, R, C extends Serializable> extends A
* Return the class to be used when parsing the cursor
*/
protected abstract CursorSerializer<C> getCursorSerializer();
-
-
- /**
- * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
- * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
- */
- protected static class CursorSeek<C> {
-
- private Optional<C> seek;
-
- private CursorSeek(final Optional<C> cursorValue){
- seek = cursorValue;
- }
-
-
- /**
- * Get the seek value to use when searching
- * @return
- */
- public Optional<C> getSeekValue(){
- final Optional<C> toReturn = seek;
-
- seek = Optional.absent();
-
- return toReturn;
- }
-
-
-
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
new file mode 100644
index 0000000..b803658
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/read/CursorSeek.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.pipeline.read;
+
+
+import com.google.common.base.Optional;
+
+
+/**
+ * An internal class that holds a mutable state. When resuming, we only ever honor the seek value on the first call. Afterwards, we will seek from the beginning on newly emitted values.
+ * Calling get will return the first value to seek, or absent if not specified. Subsequent calls will return absent. Callers should treat the results as seek values for each operation
+ */
+public class CursorSeek<C> {
+
+ private Optional<C> seek;
+
+ public CursorSeek( final Optional<C> cursorValue ){
+ seek = cursorValue;
+ }
+
+
+ /**
+ * Get the seek value to use when searching
+ * @return
+ */
+ public Optional<C> getSeekValue(){
+ final Optional<C> toReturn = seek;
+
+ seek = Optional.absent();
+
+ return toReturn;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index aada240..9070609 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -24,6 +24,7 @@ import com.google.common.base.Optional;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
import rx.Observable;
@@ -44,8 +45,9 @@ public interface AllEntityIdsObservable {
* Get all edges that represent edges to entities in the system
* @param appScopes
* @param edgeType The edge type to use (if specified)
+ * @param lastEdge The edge to resume processing from
* @return
*/
- Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType);
+ Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 6a95e7b..0420a32 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -28,6 +28,7 @@ import com.google.inject.Singleton;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
@@ -81,12 +82,12 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
@Override
- public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) {
+ public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Edge> lastEdge) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType )
+ return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType, lastEdge )
.map( edge -> new EdgeScope(applicationScope, edge ));
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index a17c925..cb9919f 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.base.Optional;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.junit.After;
@@ -196,7 +197,9 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
try {
- reIndexService.rebuildIndex( Optional.of( em.getApplicationId()), Optional.<String>of("catherders"), Optional.absent(), Optional.absent() );
+ final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
+
+ reIndexService.rebuildIndex(builder );
reporter.report();
registry.remove( meterName );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 964e13d..78a1d4b 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -42,16 +42,6 @@ public interface EdgesObservable {
/**
- * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified
- * @param gm
- * @param sourceNode
- * @param edgeType The edge type if specified. Otherwise all types will be used
- * @return
- */
- Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
- final Optional<String> edgeType );
-
- /**
* Get all edges from the source node with the target type
* @param gm
* @param sourceNode
@@ -67,4 +57,15 @@ public interface EdgesObservable {
* @return
*/
Observable<Edge> edgesToTarget(final GraphManager gm, final Id targetNode);
+
+ /**
+ * Return an observable of all edges from a source node. Ordered ascending, from the startTimestamp if specified
+ * @param gm
+ * @param sourceNode
+ * @param edgeType The edge type if specified. Otherwise all types will be used
+ * @param resume The edge to start seeking after. Otherwise starts at the most recent
+ * @return
+ */
+ Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+ final Optional<String> edgeType, final Optional<Edge> resume );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/20c9b350/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index 7240798..18274ac 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -72,7 +72,7 @@ public class EdgesObservableImpl implements EdgesObservable {
@Override
public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
- final Optional<String> edgeTypeInput ) {
+ final Optional<String> edgeTypeInput, final Optional<Edge> resume ) {
@@ -86,7 +86,7 @@ public class EdgesObservableImpl implements EdgesObservable {
return gm.loadEdgesFromSource(
new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
- Optional.<Edge>absent() ) );
+ resume ) );
} );
}
[03/10] incubator-usergrid git commit: Refactored observable methods
to correct name
Posted by gr...@apache.org.
Refactored observable methods to correct name
Added timestamp to message so that consumers can filter values for faster processing
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cb179d35
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cb179d35
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cb179d35
Branch: refs/heads/two-dot-o-dev
Commit: cb179d3512952203d20c00773789df23e27f147d
Parents: b1d9ac2
Author: Todd Nine <tn...@apigee.com>
Authored: Tue May 12 17:40:20 2015 -0700
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue May 12 17:40:20 2015 -0700
----------------------------------------------------------------------
.../asyncevents/EventBuilder.java | 6 +-
.../asyncevents/EventBuilderImpl.java | 18 ++-
.../asyncevents/InMemoryAsyncEventService.java | 5 +-
.../asyncevents/SQSAsyncEventService.java | 9 +-
.../index/EntityIndexOperation.java | 46 +++++++
.../index/IndexServiceRequestBuilder.java | 88 +++++++++++++
.../index/IndexServiceRequestBuilderImpl.java | 130 +++++++++++++++++++
.../corepersistence/index/ReIndexAction.java | 2 +-
.../corepersistence/index/ReIndexService.java | 13 +-
.../index/ReIndexServiceImpl.java | 44 ++++---
.../rx/impl/AllEntityIdsObservable.java | 3 +-
.../rx/impl/AllEntityIdsObservableImpl.java | 5 +-
.../util/SerializableMapper.java | 91 -------------
.../rx/EdgesToTargetObservableIT.java | 4 +-
.../graph/serialization/EdgesObservable.java | 7 +-
.../serialization/impl/EdgesObservableImpl.java | 8 +-
.../impl/TargetIdObservableImpl.java | 2 +-
.../impl/migration/EdgeDataMigrationImpl.java | 2 +-
18 files changed, 342 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
index f48451c..f9f157e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilder.java
@@ -22,8 +22,8 @@ package org.apache.usergrid.corepersistence.asyncevents;
import java.util.List;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
@@ -72,10 +72,10 @@ public interface EventBuilder {
/**
* Re-index an entity in the scope provided
- * @param entityIdScope
+ * @param entityIndexOperation
* @return
*/
- Observable<IndexOperationMessage> index( EntityIdScope entityIdScope );
+ Observable<IndexOperationMessage> index( EntityIndexOperation entityIndexOperation );
/**
* A bean to hold both our observables so the caller can choose the subscription mechanism. Note that
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
index c0d82d2..d35ed6d 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java
@@ -25,12 +25,13 @@ import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexService;
+import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.collection.EntityCollectionManager;
import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
@@ -38,6 +39,7 @@ import org.apache.usergrid.persistence.graph.GraphManagerFactory;
import org.apache.usergrid.persistence.index.impl.IndexOperationMessage;
import org.apache.usergrid.persistence.model.entity.Entity;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.persistence.model.field.Field;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -140,14 +142,20 @@ public class EventBuilderImpl implements EventBuilder {
@Override
- public Observable<IndexOperationMessage> index( final EntityIdScope entityIdScope ) {
+ public Observable<IndexOperationMessage> index( final EntityIndexOperation entityIndexOperation ) {
- final ApplicationScope applicationScope = entityIdScope.getApplicationScope();
+ final ApplicationScope applicationScope = entityIndexOperation.getApplicationScope();
- final Id entityId = entityIdScope.getId();
+ final Id entityId = entityIndexOperation.getId();
//load the entity
- return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId )
+ return entityCollectionManagerFactory.createCollectionManager( applicationScope ).load( entityId ).filter(
+ entity -> {
+ final Field<Long> modified = entity.getField( Schema.PROPERTY_MODIFIED );
+
+ //only re-index if it has been updated and been updated after our timestamp
+ return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince();
+ } )
//perform indexing on the task scheduler and start it
.flatMap( entity -> indexService.indexEntity( applicationScope, entity ) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
index 6faa695..96966bf 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/InMemoryAsyncEventService.java
@@ -23,6 +23,7 @@ package org.apache.usergrid.corepersistence.asyncevents;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -97,10 +98,10 @@ public class InMemoryAsyncEventService implements AsyncEventService {
@Override
- public void index( final EntityIdScope entityIdScope ) {
+ public void index( final EntityIndexOperation entityIndexOperation ) {
- run(eventBuilder.index( entityIdScope ));
+ run(eventBuilder.index( entityIndexOperation ));
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
index 415e5e8..1dbfd4e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/SQSAsyncEventService.java
@@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.usergrid.corepersistence.index.EntityIndexOperation;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.exception.NotImplementedException;
@@ -184,7 +185,7 @@ public class SQSAsyncEventService implements AsyncEventService {
}
- @Override
+// @Override
public void index( final EntityIdScope entityIdScope ) {
//queue the re-inex operation
offer( entityIdScope );
@@ -346,4 +347,10 @@ public class SQSAsyncEventService implements AsyncEventService {
subscriptions.add( subscription );
}
}
+
+
+ @Override
+ public void index( final EntityIndexOperation entityIdScope ) {
+ throw new NotImplementedException( "Implement me" );
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
new file mode 100644
index 0000000..3548bbe
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/EntityIndexOperation.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.model.entity.Id;
+
+
+/**
+ * The operation for re-indexing an entity. The entity should be updated
+ * with an updated timestamp > updatedSince.
+ */
+public class EntityIndexOperation extends EntityIdScope {
+
+ private final long updatedSince;
+
+
+ public EntityIndexOperation( final ApplicationScope applicationScope, final Id id, final long updatedSince ) {
+ super( applicationScope, id );
+ this.updatedSince = updatedSince;
+ }
+
+
+ public long getUpdatedSince() {
+ return updatedSince;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
new file mode 100644
index 0000000..07160d8
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+/**
+ * A builder interface to build our re-index request
+ */
+public interface IndexServiceRequestBuilder {
+
+ /**
+ * Set the application id
+ */
+ IndexServiceRequestBuilder withApplicationId( final UUID applicationId );
+
+ /**
+ * Set the collection name. If not set, every collection will be reindexed
+ * @param collectionName
+ * @return
+ */
+ IndexServiceRequestBuilder withCollection( final String collectionName );
+
+ /**
+ * Set our cursor to resume processing
+ * @param cursor
+ * @return
+ */
+ IndexServiceRequestBuilder withCursor(final String cursor);
+
+
+ /**
+ * Set the timestamp to re-index entities updated >= this timestamp
+ * @param timestamp
+ * @return
+ */
+ IndexServiceRequestBuilder withStartTimestamp(final Long timestamp);
+
+
+ /**
+ * Get the application scope
+ * @return
+ */
+ Optional<ApplicationScope> getApplicationScope();
+
+ /**
+ * Get the collection name
+ * @return
+ */
+ Optional<String> getCollectionName();
+
+ /**
+ * Get the cursor
+ * @return
+ */
+ Optional<String> getCursor();
+
+ /**
+ * Get the updated since timestamp
+ * @return
+ */
+ Optional<Long> getUpdateTimestamp();
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
new file mode 100644
index 0000000..3466674
--- /dev/null
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.corepersistence.index;
+
+
+import java.util.UUID;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+
+import com.google.common.base.Optional;
+
+
+public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
+
+
+ /**
+ *
+ final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+
+ final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+
+ //create an observable that loads each entity and indexes it, start it running with publish
+ final ConnectableObservable<EdgeScope> runningReIndex =
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+
+ //for each edge, create our scope and index on it
+ .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+
+
+
+ //start our sampler and state persistence
+ //take a sample every sample interval to allow us to resume state with minimal loss
+ runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
+ rxTaskScheduler.getAsyncIOScheduler() )
+ .doOnNext( edge -> {
+
+ final String serializedState = SerializableMapper.asString( edge );
+
+ mapManager.putString( newCursor, serializedState, INDEX_TTL );
+ } ).subscribe();
+
+
+ */
+
+ private Optional<UUID> withApplicationId;
+ private Optional<String> withCollectionName;
+ private Optional<String> cursor;
+ private Optional<Long> updateTimestamp;
+
+
+ /***
+ *
+ * @param applicationId
+ * @return
+ */
+ @Override
+ public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
+ this.withApplicationId = Optional.fromNullable(applicationId);
+ return this;
+ }
+
+
+ @Override
+ public IndexServiceRequestBuilder withCollection( final String collectionName ) {
+ this.withCollectionName = Optional.fromNullable( collectionName );
+ return this;
+ }
+
+
+ @Override
+ public IndexServiceRequestBuilder withCursor( final String cursor ) {
+ this.cursor = Optional.fromNullable( cursor );
+ return this;
+ }
+
+
+ @Override
+ public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
+ this.updateTimestamp = Optional.fromNullable(timestamp );
+ return this;
+ }
+
+
+ @Override
+ public Optional<ApplicationScope> getApplicationScope() {
+
+ if(this.withApplicationId.isPresent()){
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get()));
+ }
+
+ return Optional.absent();
+ }
+
+
+ @Override
+ public Optional<String> getCollectionName() {
+ return withCollectionName;
+ }
+
+
+ @Override
+ public Optional<String> getCursor() {
+ return cursor;
+ }
+
+
+ @Override
+ public Optional<Long> getUpdateTimestamp() {
+ return updateTimestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
index b878246..672b3c8 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexAction.java
@@ -33,5 +33,5 @@ public interface ReIndexAction {
* Index this entity with the specified scope
* @param entityIdScope
*/
- void index( final EntityIdScope entityIdScope );
+ void index( final EntityIndexOperation entityIdScope );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index e594ad3..b25eca5 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -45,16 +45,17 @@ public interface ReIndexService {
/**
* Perform an index rebuild
*
- * @param appId The applicationId to re-index, or all applications if absent
- * @param collection The collection name to re-index. Otherwise all collections in an app will be used.
- * @param cursor An optional cursor to resume processing
- * @param startTimestamp The time to start indexing from. All edges >= this time will be indexed.
+ * @param indexServiceRequestBuilder The builder to build the request
* @return
*/
- IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
- final Optional<Long> startTimestamp);
+ IndexResponse rebuildIndex(final IndexServiceRequestBuilder indexServiceRequestBuilder);
+ /**
+ * Generate a build for the index
+ * @return
+ */
+ IndexServiceRequestBuilder getBuilder();
/**
* The response when requesting a re-index operation
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index bd1bff9..a2fa09a 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -20,7 +20,6 @@
package org.apache.usergrid.corepersistence.index;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
@@ -28,7 +27,6 @@ import org.apache.usergrid.corepersistence.rx.impl.AllApplicationsObservable;
import org.apache.usergrid.corepersistence.rx.impl.AllEntityIdsObservable;
import org.apache.usergrid.corepersistence.rx.impl.EdgeScope;
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.corepersistence.util.SerializableMapper;
import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
import org.apache.usergrid.persistence.core.rx.RxTaskScheduler;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -46,13 +44,11 @@ import com.google.inject.Singleton;
import rx.Observable;
import rx.observables.ConnectableObservable;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getApplicationScope;
-
@Singleton
public class ReIndexServiceImpl implements ReIndexService {
- private static final MapScope RESUME_MAP_SCOPTE =
+ private static final MapScope RESUME_MAP_SCOPE =
new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
//Keep cursors to resume re-index for 1 day. This is far beyond it's useful real world implications anyway.
@@ -78,31 +74,41 @@ public class ReIndexServiceImpl implements ReIndexService {
this.rxTaskScheduler = rxTaskScheduler;
this.indexService = indexService;
- this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPTE );
+ this.mapManager = mapManagerFactory.createMapManager( RESUME_MAP_SCOPE );
}
+
+
@Override
- public IndexResponse rebuildIndex( final Optional<UUID> appId, final Optional<String> collection, final Optional<String> cursor,
- final Optional<Long> startTimestamp ) {
+ public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
- //load our last emitted Scope if a cursor is present
- if ( cursor.isPresent() ) {
+ //load our last emitted Scope if a cursor is present
+ if ( indexServiceRequestBuilder.getCursor().isPresent() ) {
throw new UnsupportedOperationException( "Build this" );
}
- final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
+ final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
+ final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( appId.get() ) : allApplicationsObservable.getData();
+
+
+
final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
+ final long modifiedSince = indexServiceRequestBuilder.getUpdateTimestamp().or( Long.MIN_VALUE );
+
//create an observable that loads each entity and indexes it, start it running with publish
final ConnectableObservable<EdgeScope> runningReIndex =
- allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
+ allEntityIdsObservable.getEdgesToEntities( applicationScopes,
+ indexServiceRequestBuilder.getCollectionName() )
//for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
+ .doOnNext( edge -> indexService.index(
+ new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
+ modifiedSince ) ) ).publish();
@@ -112,9 +118,9 @@ public class ReIndexServiceImpl implements ReIndexService {
rxTaskScheduler.getAsyncIOScheduler() )
.doOnNext( edge -> {
- final String serializedState = SerializableMapper.asString( edge );
-
- mapManager.putString( newCursor, serializedState, INDEX_TTL );
+// final String serializedState = SerializableMapper.asString( edge );
+//
+// mapManager.putString( newCursor, serializedState, INDEX_TTL );
} ).subscribe();
@@ -124,6 +130,12 @@ public class ReIndexServiceImpl implements ReIndexService {
return new IndexResponse( newCursor, runningReIndex );
}
+
+
+ @Override
+ public IndexServiceRequestBuilder getBuilder() {
+ return new IndexServiceRequestBuilderImpl();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
index b9e5373..aada240 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservable.java
@@ -44,9 +44,8 @@ public interface AllEntityIdsObservable {
* Get all edges that represent edges to entities in the system
* @param appScopes
* @param edgeType The edge type to use (if specified)
- * @param startTime The time to start with
* @return
*/
- Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime);
+ Observable<EdgeScope> getEdgesToEntities(final Observable<ApplicationScope> appScopes, final Optional<String> edgeType);
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
index 257fab1..6a95e7b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/rx/impl/AllEntityIdsObservableImpl.java
@@ -81,12 +81,13 @@ public class AllEntityIdsObservableImpl implements AllEntityIdsObservable {
@Override
- public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType, final Optional<Long> startTime) {
+ public Observable<EdgeScope> getEdgesToEntities( final Observable<ApplicationScope> appScopes, final Optional<String> edgeType) {
return appScopes.flatMap( applicationScope -> {
final GraphManager gm = graphManagerFactory.createEdgeManager( applicationScope );
- return edgesObservable.edgesFromSourceAscending( gm, applicationScope.getApplication(),edgeType, startTime ).map( edge -> new EdgeScope(applicationScope, edge ));
+ return edgesObservable.edgesFromSourceDescending( gm, applicationScope.getApplication(), edgeType )
+ .map( edge -> new EdgeScope(applicationScope, edge ));
} );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
deleted file mode 100644
index 19ecf6d..0000000
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/SerializableMapper.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.usergrid.corepersistence.util;
-
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
-import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.smile.SmileFactory;
-import com.google.common.base.Preconditions;
-
-
-/**
- * A simple utility for serializing serializable classes to/and from strings. To be used for small object storage only, such as resume on re-index
- * storing data such as entities should be specialized.
- */
-public class SerializableMapper {
-
- private static final SmileFactory SMILE_FACTORY = new SmileFactory();
-
- private static final ObjectMapper MAPPER = new ObjectMapper( SMILE_FACTORY );
-
- static{
- MAPPER.enableDefaultTypingAsProperty( ObjectMapper.DefaultTyping.JAVA_LANG_OBJECT, "@class" );
- SMILE_FACTORY.delegateToTextual( true );
- }
-
- /**
- * Get value as a string
- * @param toSerialize
- * @param <T>
- * @return
- */
- public static <T extends Serializable> String asString(final T toSerialize){
- try {
- return MAPPER.writeValueAsString( toSerialize );
- }
- catch ( JsonProcessingException e ) {
- throw new RuntimeException( "Unable to process json", e );
- }
- }
-
-
- /**
- * Write the value as a string
- * @param <T>
- * @param serialized
- * @param clazz
- * @return
- */
- public static <T extends Serializable> T fromString(final String serialized, final Class<T> clazz){
- Preconditions.checkNotNull(serialized, "serialized string cannot be null");
-
-
- InputStream stream = new ByteArrayInputStream(serialized.getBytes( StandardCharsets.UTF_8));
-
- try {
- return MAPPER.readValue( stream, clazz );
- }
- catch ( IOException e ) {
- throw new RuntimeException( String.format("Unable to parse string '%s'", serialized), e );
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 92f2b01..9e84219 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -89,7 +89,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- edgesFromSourceObservable.edgesFromSourceAscending( gm, applicationId ).doOnNext( edge -> {
+ edgesFromSourceObservable.edgesFromSourceDescending( gm, applicationId ).doOnNext( edge -> {
final String edgeType = edge.getType();
final Id target = edge.getTargetNode();
@@ -118,7 +118,7 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
//test connections
- edgesFromSourceObservable.edgesFromSourceAscending( gm, source ).doOnNext( edge -> {
+ edgesFromSourceObservable.edgesFromSourceDescending( gm, source ).doOnNext( edge -> {
final String edgeType = edge.getType();
final Id target = edge.getTargetNode();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
index 9f0bd60..964e13d 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/EdgesObservable.java
@@ -38,7 +38,7 @@ public interface EdgesObservable {
* @param sourceNode
* @return
*/
- Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode );
+ Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode );
/**
@@ -46,11 +46,10 @@ public interface EdgesObservable {
* @param gm
* @param sourceNode
* @param edgeType The edge type if specified. Otherwise all types will be used
- * @param startTimestamp The start timestamp if specfiied, otherwise Long.MIN will be used
* @return
*/
- Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode,final Optional<String> edgeType,
- final Optional<Long> startTimestamp );
+ Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+ final Optional<String> edgeType );
/**
* Get all edges from the source node with the target type
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
index df9e094..7240798 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/EdgesObservableImpl.java
@@ -55,7 +55,7 @@ public class EdgesObservableImpl implements EdgesObservable {
* Get all edges from the source
*/
@Override
- public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode ) {
+ public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode ) {
final Observable<String> edgeTypes =
gm.getEdgeTypesFromSource( new SimpleSearchEdgeType( sourceNode, null, null ) );
@@ -71,8 +71,8 @@ public class EdgesObservableImpl implements EdgesObservable {
@Override
- public Observable<Edge> edgesFromSourceAscending( final GraphManager gm, final Id sourceNode, final Optional<String> edgeTypeInput,
- final Optional<Long> startTimestamp ) {
+ public Observable<Edge> edgesFromSourceDescending( final GraphManager gm, final Id sourceNode,
+ final Optional<String> edgeTypeInput ) {
@@ -85,7 +85,7 @@ public class EdgesObservableImpl implements EdgesObservable {
logger.debug( "Loading edges of edgeType {} from {}", edgeType, sourceNode );
return gm.loadEdgesFromSource(
- new SimpleSearchByEdgeType( sourceNode, edgeType, startTimestamp.or( Long.MIN_VALUE ), SearchByEdgeType.Order.ASCENDING,
+ new SimpleSearchByEdgeType( sourceNode, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
Optional.<Edge>absent() ) );
} );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
index 5cf5117..82c7d54 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/TargetIdObservableImpl.java
@@ -55,7 +55,7 @@ public class TargetIdObservableImpl implements TargetIdObservable {
public Observable<Id> getTargetNodes(final GraphManager gm, final Id sourceNode) {
//only search edge types that start with collections
- return edgesFromSourceObservable.edgesFromSourceAscending( gm, sourceNode ).map( new Func1<Edge, Id>() {
+ return edgesFromSourceObservable.edgesFromSourceDescending( gm, sourceNode ).map( new Func1<Edge, Id>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cb179d35/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
index 0df26ff..d6c42e3 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/migration/EdgeDataMigrationImpl.java
@@ -87,7 +87,7 @@ public class EdgeDataMigrationImpl implements DataMigration<GraphNode> {
final GraphManager gm = graphManagerFactory.createEdgeManager( graphNode.applicationScope );
//get edges from the source
- return edgesFromSourceObservable.edgesFromSourceAscending( gm, graphNode.entryNode ).buffer( 1000 )
+ return edgesFromSourceObservable.edgesFromSourceDescending( gm, graphNode.entryNode ).buffer( 1000 )
.doOnNext( edges -> {
final MutationBatch batch = keyspace.prepareMutationBatch();
[05/10] incubator-usergrid git commit: Merge branch 'two-dot-o-dev'
of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into
USERGRID-643
Posted by gr...@apache.org.
Merge branch 'two-dot-o-dev' of https://git-wip-us.apache.org/repos/asf/incubator-usergrid into USERGRID-643
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/a767bb61
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/a767bb61
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/a767bb61
Branch: refs/heads/two-dot-o-dev
Commit: a767bb61e68525006202a447717e6592f0dc0c8e
Parents: 20c9b35 5b1dfa1
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 17:19:41 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 17:19:41 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 22 ++---
.../corepersistence/CpEntityManagerFactory.java | 8 +-
.../corepersistence/CpRelationManager.java | 90 ++++++++++----------
.../corepersistence/index/IndexServiceImpl.java | 6 +-
.../usergrid/persistence/EntityManager.java | 34 ++++----
.../persistence/MultiQueryIterator.java | 4 +-
.../apache/usergrid/persistence/PathQuery.java | 3 +-
.../usergrid/persistence/RelationManager.java | 12 +--
.../usergrid/persistence/CollectionIT.java | 2 +-
.../usergrid/persistence/CountingMutatorIT.java | 6 +-
.../persistence/EntityConnectionsIT.java | 26 +++---
.../org/apache/usergrid/persistence/GeoIT.java | 8 +-
.../PerformanceEntityRebuildIndexTest.java | 8 +-
.../persistence/query/ConnectionHelper.java | 2 +-
.../persistence/query/IteratingQueryIT.java | 2 +-
.../resources/usergrid-custom-test.properties | 2 +
.../persistence/index/impl/IndexingUtils.java | 2 +-
.../org/apache/usergrid/rest/ApiResponse.java | 16 ++++
.../apache/usergrid/rest/ApiResponseTest.java | 45 ++++++++++
.../rest/management/ImportResourceIT.java | 2 +-
.../cassandra/ManagementServiceImpl.java | 23 ++---
.../management/export/ExportServiceImpl.java | 4 +-
.../management/importer/FileImportJob.java | 5 +-
.../management/importer/ImportServiceImpl.java | 15 ++--
.../services/AbstractConnectionsService.java | 24 +++---
.../users/activities/ActivitiesService.java | 4 +-
.../users/activities/ActivitiesService.java | 4 +-
.../apache/usergrid/management/EmailFlowIT.java | 9 +-
.../management/importer/ImportCollectionIT.java | 2 +-
.../importer/ImportConnectionsTest.java | 9 +-
.../management/importer/ImportServiceIT.java | 6 +-
31 files changed, 224 insertions(+), 181 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a767bb61/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManagerFactory.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/a767bb61/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
[06/10] incubator-usergrid git commit: Fixes bugs and cleans up tests
Posted by gr...@apache.org.
Fixes bugs and cleans up tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/48be894f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/48be894f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/48be894f
Branch: refs/heads/two-dot-o-dev
Commit: 48be894fb8af857f33700fcc2717357936d12913
Parents: a767bb6
Author: Todd Nine <tn...@apigee.com>
Authored: Thu May 14 19:23:04 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu May 14 19:23:04 2015 -0600
----------------------------------------------------------------------
.../usergrid/corepersistence/CoreModule.java | 4 +
.../corepersistence/CpRelationManager.java | 1 -
.../index/IndexProcessorFig.java | 2 +-
.../index/IndexServiceRequestBuilderImpl.java | 74 ++---
.../corepersistence/index/ReIndexService.java | 16 +-
.../index/ReIndexServiceImpl.java | 48 ++-
.../cursor/AbstractCursorSerializer.java | 2 +-
.../PerformanceEntityRebuildIndexTest.java | 318 +++++++++----------
8 files changed, 225 insertions(+), 240 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
index a02bffd..b22a7cb 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CoreModule.java
@@ -25,6 +25,8 @@ import org.apache.usergrid.corepersistence.asyncevents.EventBuilderImpl;
import org.apache.usergrid.corepersistence.index.IndexProcessorFig;
import org.apache.usergrid.corepersistence.index.IndexService;
import org.apache.usergrid.corepersistence.index.IndexServiceImpl;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
+import org.apache.usergrid.corepersistence.index.ReIndexServiceImpl;
import org.apache.usergrid.corepersistence.migration.AppInfoMigrationPlugin;
import org.apache.usergrid.corepersistence.migration.CoreMigration;
import org.apache.usergrid.corepersistence.migration.CoreMigrationPlugin;
@@ -142,6 +144,8 @@ public class CoreModule extends AbstractModule {
bind( AsyncEventService.class ).toProvider( AsyncIndexProvider.class );
+ bind( ReIndexService.class).to( ReIndexServiceImpl.class );
+
install( new GuicyFigModule( IndexProcessorFig.class ) );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 4993d88..cba1a07 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -661,7 +661,6 @@ public class CpRelationManager implements RelationManager {
}while (!found && length <= maxLength);
if(logger.isInfoEnabled()){
logger.info(String.format("Consistent Search finished in %s, results=%s, expected=%s...dumping stack",length, results.size(),expectedResults));
- Thread.dumpStack();
}
return results;
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
index 8e835e2..e4b2329 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexProcessorFig.java
@@ -78,7 +78,7 @@ public interface IndexProcessorFig extends GuicyFig {
String getQueueImplementation();
- @Default("10000")
+ @Default("1000")
@Key("elasticsearch.reindex.flush.interval")
int getUpdateInterval();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
index 3466674..4017b6e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/IndexServiceRequestBuilderImpl.java
@@ -22,70 +22,57 @@ package org.apache.usergrid.corepersistence.index;
import java.util.UUID;
-
import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import com.google.common.base.Optional;
+/**
+ * Index service request builder
+ */
public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilder {
-
- /**
- *
- final Observable<ApplicationScope> applicationScopes = appId.isPresent()? Observable.just( getApplicationScope(appId.get()) ) : allApplicationsObservable.getData();
-
- final String newCursor = StringUtils.sanitizeUUID( UUIDGenerator.newTimeUUID() );
-
- //create an observable that loads each entity and indexes it, start it running with publish
- final ConnectableObservable<EdgeScope> runningReIndex =
- allEntityIdsObservable.getEdgesToEntities( applicationScopes, collection, startTimestamp )
-
- //for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index( new EntityIdScope( edge.getApplicationScope(), edge.getEdge().getTargetNode() ) ) ).publish();
-
-
-
- //start our sampler and state persistence
- //take a sample every sample interval to allow us to resume state with minimal loss
- runningReIndex.sample( indexProcessorFig.getReIndexSampleInterval(), TimeUnit.MILLISECONDS,
- rxTaskScheduler.getAsyncIOScheduler() )
- .doOnNext( edge -> {
-
- final String serializedState = SerializableMapper.asString( edge );
-
- mapManager.putString( newCursor, serializedState, INDEX_TTL );
- } ).subscribe();
-
-
- */
-
- private Optional<UUID> withApplicationId;
- private Optional<String> withCollectionName;
- private Optional<String> cursor;
- private Optional<Long> updateTimestamp;
+ private Optional<UUID> withApplicationId = Optional.absent();
+ private Optional<String> withCollectionName = Optional.absent();
+ private Optional<String> cursor = Optional.absent();
+ private Optional<Long> updateTimestamp = Optional.absent();
/***
*
- * @param applicationId
+ * @param applicationId The application id
* @return
*/
@Override
public IndexServiceRequestBuilder withApplicationId( final UUID applicationId ) {
- this.withApplicationId = Optional.fromNullable(applicationId);
+ this.withApplicationId = Optional.fromNullable( applicationId );
return this;
}
+ /**
+ * the colleciton name
+ * @param collectionName
+ * @return
+ */
@Override
public IndexServiceRequestBuilder withCollection( final String collectionName ) {
- this.withCollectionName = Optional.fromNullable( collectionName );
+ if(collectionName == null){
+ this.withCollectionName = Optional.absent();
+ }
+ else {
+ this.withCollectionName = Optional.fromNullable( CpNamingUtils.getEdgeTypeFromCollectionName( collectionName ) );
+ }
return this;
}
+ /**
+ * The cursor
+ * @param cursor
+ * @return
+ */
@Override
public IndexServiceRequestBuilder withCursor( final String cursor ) {
this.cursor = Optional.fromNullable( cursor );
@@ -93,9 +80,14 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
}
+ /**
+ * Set start timestamp in epoch time. Only entities updated since this time will be processed for indexing
+ * @param timestamp
+ * @return
+ */
@Override
public IndexServiceRequestBuilder withStartTimestamp( final Long timestamp ) {
- this.updateTimestamp = Optional.fromNullable(timestamp );
+ this.updateTimestamp = Optional.fromNullable( timestamp );
return this;
}
@@ -103,8 +95,8 @@ public class IndexServiceRequestBuilderImpl implements IndexServiceRequestBuilde
@Override
public Optional<ApplicationScope> getApplicationScope() {
- if(this.withApplicationId.isPresent()){
- return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get()));
+ if ( this.withApplicationId.isPresent() ) {
+ return Optional.of( CpNamingUtils.getApplicationScope( withApplicationId.get() ) );
}
return Optional.absent();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
index f8955dd..af3615e 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java
@@ -31,7 +31,7 @@ public interface ReIndexService {
*
* @param indexServiceRequestBuilder The builder to build the request
*/
- IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
+ ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder );
/**
@@ -45,20 +45,20 @@ public interface ReIndexService {
* @param jobId The jobId returned during the rebuild index
* @return
*/
- IndexResponse getStatus( final String jobId );
+ ReIndexStatus getStatus( final String jobId );
/**
* The response when requesting a re-index operation
*/
- class IndexResponse {
+ class ReIndexStatus {
final String jobId;
- final String status;
+ final Status status;
final long numberProcessed;
final long lastUpdated;
- public IndexResponse( final String jobId, final String status, final long numberProcessed,
+ public ReIndexStatus( final String jobId, final Status status, final long numberProcessed,
final long lastUpdated ) {
this.jobId = jobId;
this.status = status;
@@ -97,8 +97,12 @@ public interface ReIndexService {
* Get the status
* @return
*/
- public String getStatus() {
+ public Status getStatus() {
return status;
}
}
+
+ enum Status{
+ STARTED, INPROGRESS, COMPLETE;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
index d828fc2..f44113b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java
@@ -22,6 +22,10 @@ package org.apache.usergrid.corepersistence.index;
import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.corepersistence.asyncevents.AsyncEventService;
import org.apache.usergrid.corepersistence.pipeline.cursor.CursorSerializerUtil;
import org.apache.usergrid.corepersistence.pipeline.read.CursorSeek;
@@ -51,6 +55,8 @@ import rx.schedulers.Schedulers;
@Singleton
public class ReIndexServiceImpl implements ReIndexService {
+ private static final Logger logger = LoggerFactory.getLogger( ReIndexServiceImpl.class );
+
private static final MapScope RESUME_MAP_SCOPE =
new MapScopeImpl( CpNamingUtils.getManagementApplicationId(), "reindexresume" );
@@ -85,7 +91,7 @@ public class ReIndexServiceImpl implements ReIndexService {
@Override
- public IndexResponse rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
+ public ReIndexStatus rebuildIndex( final IndexServiceRequestBuilder indexServiceRequestBuilder ) {
//load our last emitted Scope if a cursor is present
@@ -97,7 +103,7 @@ public class ReIndexServiceImpl implements ReIndexService {
final Optional<ApplicationScope> appId = indexServiceRequestBuilder.getApplicationScope();
- Preconditions.checkArgument( cursor.isPresent() && appId.isPresent(),
+ Preconditions.checkArgument( !(cursor.isPresent() && appId.isPresent()),
"You cannot specify an app id and a cursor. When resuming with cursor you must omit the appid" );
final Observable<ApplicationScope> applicationScopes = getApplications( cursor, appId );
@@ -112,9 +118,14 @@ public class ReIndexServiceImpl implements ReIndexService {
indexServiceRequestBuilder.getCollectionName(), cursorSeek.getSeekValue() )
//for each edge, create our scope and index on it
- .doOnNext( edge -> indexService.index(
- new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(),
- modifiedSince ) ) );
+ .doOnNext( edge -> {
+ final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( edge.getApplicationScope(), edge.getEdge().getTargetNode(), modifiedSince );
+
+ logger.info( "Queueing {}", entityIndexOperation );
+
+ indexService.index(entityIndexOperation);
+
+ } );
//start our sampler and state persistence
@@ -127,7 +138,7 @@ public class ReIndexServiceImpl implements ReIndexService {
.subscribeOn( Schedulers.io() ).subscribe();
- return new IndexResponse( jobId, "Started", 0, 0 );
+ return new ReIndexStatus( jobId, Status.STARTED, 0, 0 );
}
@@ -138,7 +149,7 @@ public class ReIndexServiceImpl implements ReIndexService {
@Override
- public IndexResponse getStatus( final String jobId ) {
+ public ReIndexStatus getStatus( final String jobId ) {
Preconditions.checkNotNull( jobId, "jobId must not be null" );
return getIndexResponse( jobId );
}
@@ -166,11 +177,11 @@ public class ReIndexServiceImpl implements ReIndexService {
writeCursorState( jobId, buffer.get( buffer.size() - 1 ) );
}
- writeStateMeta( jobId, "InProgress", count, System.currentTimeMillis() );
+ writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() );
}
public void complete(){
- writeStateMeta( jobId, "Complete", count, System.currentTimeMillis() );
+ writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() );
}
}
@@ -257,10 +268,15 @@ public class ReIndexServiceImpl implements ReIndexService {
* @param processedCount
* @param lastUpdated
*/
- private void writeStateMeta( final String jobId, final String status, final long processedCount,
+ private void writeStateMeta( final String jobId, final Status status, final long processedCount,
final long lastUpdated ) {
- mapManager.putString( jobId + MAP_STATUS_KEY, status );
+ if(logger.isDebugEnabled()) {
+ logger.debug( "Flushing state for jobId {}, status {}, processedCount {}, lastUpdated {}",
+ new Object[] { jobId, status, processedCount, lastUpdated } );
+ }
+
+ mapManager.putString( jobId + MAP_STATUS_KEY, status.name() );
mapManager.putLong( jobId + MAP_COUNT_KEY, processedCount );
mapManager.putLong( jobId + MAP_UPDATED_KEY, lastUpdated );
}
@@ -271,18 +287,20 @@ public class ReIndexServiceImpl implements ReIndexService {
* @param jobId
* @return
*/
- private IndexResponse getIndexResponse( final String jobId ) {
+ private ReIndexStatus getIndexResponse( final String jobId ) {
- final String status = mapManager.getString( jobId+MAP_STATUS_KEY );
+ final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY );
- if(status == null){
+ if(stringStatus == null){
throw new IllegalArgumentException( "Could not find a job with id " + jobId );
}
+ final Status status = Status.valueOf( stringStatus );
+
final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY );
final long lastUpdated = mapManager.getLong( jobId + MAP_COUNT_KEY );
- return new IndexResponse( jobId, status, processedCount, lastUpdated );
+ return new ReIndexStatus( jobId, status, processedCount, lastUpdated );
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
index 23bb99a..e770a77 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/pipeline/cursor/AbstractCursorSerializer.java
@@ -44,7 +44,7 @@ public abstract class AbstractCursorSerializer<T> implements CursorSerializer<T>
try {
final Class<? extends T> classType = getType();
- return objectMapper.treeToValue( node, classType );
+ return objectMapper.treeToValue( node, classType );
}
catch ( JsonProcessingException e ) {
throw new CursorParseException( "Unable to deserialize value", e );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/48be894f/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
index 55c4846..8d54043 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/PerformanceEntityRebuildIndexTest.java
@@ -22,78 +22,67 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import com.google.common.base.Optional;
-import org.apache.commons.lang.RandomStringUtils;
-
-import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
-import org.apache.usergrid.corepersistence.index.ReIndexService;
-import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.RandomStringUtils;
+
import org.apache.usergrid.AbstractCoreIT;
import org.apache.usergrid.cassandra.SpringResource;
+import org.apache.usergrid.corepersistence.index.IndexServiceRequestBuilder;
+import org.apache.usergrid.corepersistence.index.ReIndexService;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
+import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexFactory;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
-import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Slf4jReporter;
import com.google.inject.Injector;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
//@RunWith(JukitoRunner.class)
//@UseModules({ GuiceModule.class })
+
public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
- private static final Logger logger = LoggerFactory.getLogger(PerformanceEntityRebuildIndexTest.class );
+ private static final Logger logger = LoggerFactory.getLogger( PerformanceEntityRebuildIndexTest.class );
private static final MetricRegistry registry = new MetricRegistry();
- private Slf4jReporter reporter;
-
- private static final int ENTITIES_TO_INDEX = 2000;
+ private static final int ENTITIES_TO_INDEX = 1000;
@Before
public void startReporting() {
- logger.debug("Starting metrics reporting");
- reporter = Slf4jReporter.forRegistry( registry ).outputTo( logger )
- .convertRatesTo( TimeUnit.SECONDS )
- .convertDurationsTo( TimeUnit.MILLISECONDS ).build();
-
- reporter.start( 10, TimeUnit.SECONDS );
+ logger.debug( "Starting metrics reporting" );
}
@After
public void printReport() {
- logger.debug("Printing metrics report");
- reporter.report();
- reporter.stop();
+ logger.debug( "Printing metrics report" );
}
- @Test
+ @Test( timeout = 120000 )
public void rebuildOneCollectionIndex() throws Exception {
- logger.info("Started rebuildIndex()");
+ logger.info( "Started rebuildIndex()" );
- String rand = RandomStringUtils.randomAlphanumeric(5);
- final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+ String rand = RandomStringUtils.randomAlphanumeric( 5 );
+ final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
final EntityManager em = setup.getEmf().getEntityManager( appId );
@@ -102,109 +91,80 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- create a bunch of entities
Map<String, Object> entityMap = new HashMap<String, Object>() {{
- put("key1", 1000 );
- put("key2", 2000 );
- put("key3", "Some value");
+ put( "key1", 1000 );
+ put( "key2", 2000 );
+ put( "key3", "Some value" );
}};
List<EntityRef> entityRefs = new ArrayList<EntityRef>();
- int herderCount = 0;
+ int herderCount = 0;
int shepardCount = 0;
- for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+ for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
final Entity entity;
try {
- entityMap.put("key", i );
+ entityMap.put( "key", i );
if ( i % 2 == 0 ) {
- entity = em.create("catherder", entityMap);
+ entity = em.create( "catherder", entityMap );
herderCount++;
- } else {
- entity = em.create("catshepard", entityMap);
+ }
+ else {
+ entity = em.create( "catshepard", entityMap );
shepardCount++;
}
-
- app.refreshIndex();
-
-// em.createConnection(entity, "herds", cat1);
-// em.createConnection(entity, "herds", cat2);
-// em.createConnection(entity, "herds", cat3);
-
- } catch (Exception ex) {
- throw new RuntimeException("Error creating entity", ex);
+ }
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
}
- entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
if ( i % 10 == 0 ) {
- logger.info("Created {} entities", i );
+ logger.info( "Created {} entities", i );
}
-
}
- logger.info("Created {} entities", ENTITIES_TO_INDEX);
+ logger.info( "Created {} entities", ENTITIES_TO_INDEX );
app.refreshIndex();
// ----------------- test that we can read them, should work fine
- logger.debug("Read the data");
- readData( em, "catherders", herderCount, 0);
- readData( em, "catshepards", shepardCount, 0);
+ logger.debug( "Read the data" );
+ readData( em, "catherders", herderCount, 0 );
+ readData( em, "catshepards", shepardCount, 0 );
// ----------------- delete the system and application indexes
- logger.debug("Deleting apps");
+ logger.debug( "Deleting apps" );
deleteIndex( em.getApplicationId() );
// ----------------- test that we can read them, should fail
- logger.debug("Reading data, should fail this time ");
- try {
- readData( em, "testTypes", ENTITIES_TO_INDEX, 0 );
- fail("should have failed to read data");
+ logger.debug( "Reading data, should fail this time " );
- } catch (Exception expected) {}
+ //should be no data
+ readData( em, "testTypes", 0, 0 );
-// ----------------- rebuild index for catherders only
- logger.debug("Preparing to rebuild all indexes");;
+ // ----------------- rebuild index for catherders only
- final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
- final Meter meter = registry.meter( meterName );
+ logger.debug( "Preparing to rebuild all indexes" );
- EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
- int counter = 0;
- @Override
- public void onProgress( final EntityRef entity ) {
-
- meter.mark();
- logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
- if ( counter % 100 == 0 ) {
- logger.info("Reindexed {} entities", counter );
- }
- counter++;
- }
+ final IndexServiceRequestBuilder builder =
+ reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
+ ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
+ assertNotNull( status.getJobId(), "JobId is present" );
- };
-
- try {
+ logger.info( "Rebuilt index" );
- final IndexServiceRequestBuilder builder = reIndexService.getBuilder().withApplicationId( em.getApplicationId() ).withCollection( "catherders" );
- reIndexService.rebuildIndex(builder );
+ waitForRebuild( status, reIndexService );
- reporter.report();
- registry.remove( meterName );
- logger.info("Rebuilt index");
-
- } catch (Exception ex) {
- logger.error("Error rebuilding index", ex);
- fail();
- }
// ----------------- test that we can read the catherder collection and not the catshepard
@@ -213,78 +173,79 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
}
- @Test
+ @Test( timeout = 120000 )
public void rebuildIndex() throws Exception {
- logger.info("Started rebuildIndex()");
+ logger.info( "Started rebuildIndex()" );
+
+ String rand = RandomStringUtils.randomAlphanumeric( 5 );
+ final UUID appId = setup.createApplication( "org_" + rand, "app_" + rand );
- String rand = RandomStringUtils.randomAlphanumeric(5);
- final UUID appId = setup.createApplication("org_" + rand, "app_" + rand);
+ final EntityManager em = setup.getEmf().getEntityManager( appId );
- final EntityManager em = setup.getEmf().getEntityManager(appId);
+ final ReIndexService reIndexService = setup.getInjector().getInstance( ReIndexService.class );
// ----------------- create a bunch of entities
Map<String, Object> entityMap = new HashMap<String, Object>() {{
- put("key1", 1000 );
- put("key2", 2000 );
- put("key3", "Some value");
+ put( "key1", 1000 );
+ put( "key2", 2000 );
+ put( "key3", "Some value" );
}};
Map<String, Object> cat1map = new HashMap<String, Object>() {{
- put("name", "enzo");
- put("color", "orange");
+ put( "name", "enzo" );
+ put( "color", "orange" );
}};
Map<String, Object> cat2map = new HashMap<String, Object>() {{
- put("name", "marquee");
- put("color", "grey");
+ put( "name", "marquee" );
+ put( "color", "grey" );
}};
Map<String, Object> cat3map = new HashMap<String, Object>() {{
- put("name", "bertha");
- put("color", "tabby");
+ put( "name", "bertha" );
+ put( "color", "tabby" );
}};
- Entity cat1 = em.create("cat", cat1map );
- Entity cat2 = em.create("cat", cat2map );
- Entity cat3 = em.create("cat", cat3map );
+ Entity cat1 = em.create( "cat", cat1map );
+ Entity cat2 = em.create( "cat", cat2map );
+ Entity cat3 = em.create( "cat", cat3map );
- List<EntityRef> entityRefs = new ArrayList<EntityRef>();
- int entityCount = 0;
- for (int i = 0; i < ENTITIES_TO_INDEX; i++) {
+ List<EntityRef> entityRefs = new ArrayList<>();
+
+ for ( int i = 0; i < ENTITIES_TO_INDEX; i++ ) {
final Entity entity;
try {
- entityMap.put("key", entityCount );
- entity = em.create("testType", entityMap );
+ entityMap.put( "key", i );
+ entity = em.create( "testType", entityMap );
- em.createConnection(entity, "herds", cat1);
- em.createConnection(entity, "herds", cat2);
- em.createConnection(entity, "herds", cat3);
-
- } catch (Exception ex) {
- throw new RuntimeException("Error creating entity", ex);
+ em.createConnection( entity, "herds", cat1 );
+ em.createConnection( entity, "herds", cat2 );
+ em.createConnection( entity, "herds", cat3 );
}
-
- entityRefs.add(new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
- if ( entityCount % 10 == 0 ) {
- logger.info("Created {} entities", entityCount );
+ catch ( Exception ex ) {
+ throw new RuntimeException( "Error creating entity", ex );
}
+ entityRefs.add( new SimpleEntityRef( entity.getType(), entity.getUuid() ) );
+ if ( i % 10 == 0 ) {
+ logger.info( "Created {} entities", i );
+ }
}
- logger.info("Created {} entities", entityCount);
+ logger.info( "Created {} entities", ENTITIES_TO_INDEX );
app.refreshIndex();
- Thread.sleep(10000);
// ----------------- test that we can read them, should work fine
- logger.debug("Read the data");
- readData( em, "testType", entityCount, 3 );
+ logger.debug( "Read the data" );
+ final String collectionName = "testtypes";
+ readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
// ----------------- delete the system and application indexes
- logger.debug("Deleting app index");
+ logger.debug( "Deleting app index" );
deleteIndex( em.getApplicationId() );
@@ -295,61 +256,73 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
// ----------------- test that we can read them, should fail
- logger.debug("Reading data, should fail this time ");
- try {
- readData( em, "testTypes", entityCount, 3 );
- fail("should have failed to read data");
+ logger.debug( "Reading data, should fail this time " );
+
+ readData( em, collectionName, 0, 0 );
+
- } catch (Exception expected) {}
// ----------------- rebuild index
- logger.debug("Preparing to rebuild all indexes");;
+ logger.debug( "Preparing to rebuild all indexes" );
+ ;
- final String meterName = this.getClass().getSimpleName() + ".rebuildIndex";
- final Meter meter = registry.meter( meterName );
- EntityManagerFactory.ProgressObserver po = new EntityManagerFactory.ProgressObserver() {
- int counter = 0;
+ try {
- @Override
- public void onProgress( final EntityRef entity ) {
+ final IndexServiceRequestBuilder builder =
+ reIndexService.getBuilder().withApplicationId( em.getApplicationId() );
- meter.mark();
- logger.debug("Indexing {}:{}", entity.getType(), entity.getUuid());
- if ( counter % 100 == 0 ) {
- logger.info("Reindexed {} entities", counter );
- }
- counter++;
- }
+ ReIndexService.ReIndexStatus status = reIndexService.rebuildIndex( builder );
+ assertNotNull( status.getJobId(), "JobId is present" );
- };
+ logger.info( "Rebuilt index" );
- try {
- fail( "Implement index rebuild" );
-// setup.getEmf().rebuildInternalIndexes( po );
-//
-// setup.getEmf().rebuildApplicationIndexes( em.getApplicationId(), po );
+ waitForRebuild( status, reIndexService );
- reporter.report();
- registry.remove( meterName );
- logger.info("Rebuilt index");
- app.refreshIndex();
+ logger.info( "Rebuilt index" );
- } catch (Exception ex) {
- logger.error("Error rebuilding index", ex);
+ app.refreshIndex();
+ }
+ catch ( Exception ex ) {
+ logger.error( "Error rebuilding index", ex );
fail();
}
// ----------------- test that we can read them
- Thread.sleep(2000);
- readData( em, "testTypes", entityCount, 3 );
+ Thread.sleep( 2000 );
+ readData( em, collectionName, ENTITIES_TO_INDEX, 3 );
+ }
+
+
+ /**
+ * Wait for the rebuild to occur
+ */
+ private void waitForRebuild( final ReIndexService.ReIndexStatus status, final ReIndexService reIndexService )
+ throws InterruptedException {
+ while ( true ) {
+
+ try {
+ final ReIndexService.ReIndexStatus updatedStatus = reIndexService.getStatus( status.getJobId() );
+
+ if ( updatedStatus.getStatus() == ReIndexService.Status.COMPLETE ) {
+ break;
+ }
+ }
+ catch ( IllegalArgumentException iae ) {
+ //swallow
+ }
+
+
+ Thread.sleep( 1000 );
+ }
}
+
/**
* Delete app index
*/
@@ -360,54 +333,49 @@ public class PerformanceEntityRebuildIndexTest extends AbstractCoreIT {
Id appId = new SimpleId( appUuid, Schema.TYPE_APPLICATION );
ApplicationScope scope = new ApplicationScopeImpl( appId );
- ApplicationEntityIndex ei = eif.createApplicationEntityIndex(scope);
+ ApplicationEntityIndex ei = eif.createApplicationEntityIndex( scope );
- ei.deleteApplication().toBlocking().lastOrDefault(null);
+ ei.deleteApplication().toBlocking().lastOrDefault( null );
app.refreshIndex();
-
}
- private int readData( EntityManager em,
- String collectionName, int expectedEntities, int expectedConnections ) throws Exception {
+ private int readData( EntityManager em, String collectionName, int expectedEntities, int expectedConnections )
+ throws Exception {
app.refreshIndex();
- Query q = Query.fromQL("select * where key1=1000");
- q.setLimit(40);
- Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q,expectedEntities );
+ Query q = Query.fromQL( "select * where key1=1000" ).withLimit( 1000 );
+ Results results = em.searchCollectionConsistent( em.getApplicationRef(), collectionName, q, expectedEntities );
int count = 0;
while ( true ) {
for ( Entity e : results.getEntities() ) {
- assertEquals( 2000, e.getProperty("key2"));
+ assertEquals( 2000, e.getProperty( "key2" ) );
- Results catResults = em.searchTargetEntities(e,
- Query.fromQL("select *").setConnectionType("herds"));
+ Results catResults =
+ em.searchTargetEntities( e, Query.fromQL( "select *" ).setConnectionType( "herds" ) );
assertEquals( expectedConnections, catResults.size() );
if ( count % 100 == 0 ) {
- logger.info( "read {} entities", count);
+ logger.info( "read {} entities", count );
}
count++;
}
if ( results.hasCursor() ) {
- logger.info( "Counted {} : query again with cursor", count);
+ logger.info( "Counted {} : query again with cursor", count );
q.setCursor( results.getCursor() );
results = em.searchCollection( em.getApplicationRef(), collectionName, q );
-
- } else {
+ }
+ else {
break;
}
}
- if ( expectedEntities != -1 && expectedEntities != count ) {
- throw new RuntimeException("Did not get expected "
- + expectedEntities + " entities, instead got " + count );
- }
+ assertEquals("Did not get expected entities", expectedEntities, count);
return count;
}
}