You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/03/30 20:03:23 UTC
[1/5] incubator-usergrid git commit: Fixes issue with throwable not
being caught in onSubscribe function
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-509 abbdd6b48 -> fbce62df8
Fixes issue with throwable not being caught in onSubscribe function
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/2d1c8b8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/2d1c8b8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/2d1c8b8a
Branch: refs/heads/USERGRID-509
Commit: 2d1c8b8ac7b20b63a11d83adca56839d8b409cca
Parents: beb2a2a
Author: Todd Nine <tn...@apigee.com>
Authored: Thu Mar 26 09:47:58 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Thu Mar 26 09:47:58 2015 -0600
----------------------------------------------------------------------
.../index/impl/EsIndexBufferConsumerImpl.java | 39 +++++++-------------
1 file changed, 14 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/2d1c8b8a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index 7e64de3..d064b97 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -153,12 +153,16 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
//name our thread so it's easy to see
Thread.currentThread().setName( "QueueConsumer_" + counter.incrementAndGet() );
- List<IndexOperationMessage> drainList;
+
+ List<IndexOperationMessage> drainList = null;
+
do {
- try {
+ Timer.Context timer = produceTimer.time();
+
+
+ try {
- Timer.Context timer = produceTimer.time();
drainList = bufferQueue
.take( config.getIndexBufferSize(), config.getIndexBufferTimeout(),
@@ -174,10 +178,15 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
timer.stop();
}
- catch ( Exception e ) {
+ catch ( Throwable t ) {
final long sleepTime = config.getFailureRetryTime();
- log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, e );
+ log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, t );
+
+ if ( drainList != null ) {
+ inFlight.addAndGet( -1 * drainList.size() );
+ }
+
try {
Thread.sleep( sleepTime );
@@ -216,26 +225,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
inFlight.addAndGet( -1 * indexOperationMessages.size() );
}
} )
- //catch an unexpected error, then emit an empty list to ensure our subscriber doesn't die
- .onErrorReturn( new Func1<Throwable, List<IndexOperationMessage>>() {
- @Override
- public List<IndexOperationMessage> call( final Throwable throwable ) {
- final long sleepTime = config.getFailureRetryTime();
-
- log.error( "Failed to dequeue. Sleeping for {} milliseconds", sleepTime, throwable );
-
- try {
- Thread.sleep( sleepTime );
- }
- catch ( InterruptedException ie ) {
- //swallow
- }
-
- indexErrorCounter.inc();
-
- return Collections.EMPTY_LIST;
- }
- } )
.subscribeOn( Schedulers.newThread() );
[3/5] incubator-usergrid git commit: Merged from 2.0
Posted by to...@apache.org.
Merged from 2.0
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/7975e4f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/7975e4f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/7975e4f3
Branch: refs/heads/USERGRID-509
Commit: 7975e4f3b204253a52a81c9e398f0e9e9ffb79c2
Parents: 07c5de0
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 27 09:47:29 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 27 09:47:29 2015 -0600
----------------------------------------------------------------------
.../persistence/index/impl/EsIndexBufferConsumerImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/7975e4f3/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
index d064b97..6d5a4d8 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/impl/EsIndexBufferConsumerImpl.java
@@ -177,7 +177,8 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
timer.stop();
}
-
+ //DO NOT add any doOnError* functions to this subscription. We want the producer
+ //to receive these exceptions and sleep before a retry
catch ( Throwable t ) {
final long sleepTime = config.getFailureRetryTime();
@@ -187,7 +188,6 @@ public class EsIndexBufferConsumerImpl implements IndexBufferConsumer {
inFlight.addAndGet( -1 * drainList.size() );
}
-
try {
Thread.sleep( sleepTime );
}
[2/5] incubator-usergrid git commit: Merge branch 'two-dot-o' into
USERGRID-509
Posted by to...@apache.org.
Merge branch 'two-dot-o' into USERGRID-509
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/07c5de06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/07c5de06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/07c5de06
Branch: refs/heads/USERGRID-509
Commit: 07c5de066f44cef1228e830bf162b128f353c456
Parents: abbdd6b 2d1c8b8
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 27 09:46:27 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 27 09:46:27 2015 -0600
----------------------------------------------------------------------
.../index/impl/EsIndexBufferConsumerImpl.java | 39 +++++++-------------
1 file changed, 14 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
[4/5] incubator-usergrid git commit: Fixes hystrix thread pool size
issue
Posted by to...@apache.org.
Fixes hystrix thread pool size issue
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/cf3f7abe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/cf3f7abe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/cf3f7abe
Branch: refs/heads/USERGRID-509
Commit: cf3f7abee29d4bee17313d00fe3abf863260e685
Parents: 7975e4f
Author: Todd Nine <tn...@apigee.com>
Authored: Fri Mar 27 16:08:32 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Fri Mar 27 16:08:32 2015 -0600
----------------------------------------------------------------------
.../collection/mvcc/stage/write/WriteUniqueVerify.java | 4 ++--
.../impl/shard/count/NodeShardApproximationImpl.java | 11 ++++++++++-
2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf3f7abe/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
index b984ad8..564d036 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/mvcc/stage/write/WriteUniqueVerify.java
@@ -218,6 +218,6 @@ public class WriteUniqueVerify implements Action1<CollectionIoEvent<MvccEntity>>
*/
public static final HystrixCommand.Setter
REPLAY_GROUP = HystrixCommand.Setter.withGroupKey(
- HystrixCommandGroupKey.Factory.asKey( "user" ) ).andThreadPoolPropertiesDefaults(
- HystrixThreadPoolProperties.Setter().withCoreSize( 1000 ) );
+ HystrixCommandGroupKey.Factory.asKey( "uniqueVerify" ) ).andThreadPoolPropertiesDefaults(
+ HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/cf3f7abe/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
index a47d528..fceb32c 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/count/NodeShardApproximationImpl.java
@@ -39,6 +39,7 @@ import org.apache.usergrid.persistence.graph.serialization.impl.shard.Shard;
import com.netflix.astyanax.MutationBatch;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixThreadPoolProperties;
import rx.functions.Action0;
import rx.schedulers.Schedulers;
@@ -75,6 +76,14 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
private final FlushWorker worker;
+ /**
+ * Command group used for realtime user commands
+ */
+ public static final HystrixCommand.Setter
+ COUNT_GROUP = HystrixCommand.Setter.withGroupKey(
+ HystrixCommandGroupKey.Factory.asKey( "BatchCounterRollup" ) ).andThreadPoolPropertiesDefaults(
+ HystrixThreadPoolProperties.Setter().withCoreSize( 100 ) );
+
/**
* Create a time shard approximation with the correct configuration.
@@ -229,7 +238,7 @@ public class NodeShardApproximationImpl implements NodeShardApproximation {
/**
* Execute the command in hystrix to avoid slamming cassandra
*/
- new HystrixCommand( HystrixCommandGroupKey.Factory.asKey("BatchCounterRollup") ) {
+ new HystrixCommand( COUNT_GROUP ) {
@Override
protected Void run() throws Exception {
[5/5] incubator-usergrid git commit: Refactored index scope
generation to be more consistent and clean
Posted by to...@apache.org.
Refactored index scope generation to be more consistent and clean
Moved some newly private methods to test utils
Added onStart event to the observable iterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/fbce62df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/fbce62df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/fbce62df
Branch: refs/heads/USERGRID-509
Commit: fbce62df84323ea19dce8a00fb6e52aa245e7bb5
Parents: cf3f7ab
Author: Todd Nine <tn...@apigee.com>
Authored: Mon Mar 30 09:45:46 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Mon Mar 30 12:03:10 2015 -0600
----------------------------------------------------------------------
.../corepersistence/CpEntityManager.java | 29 +---
.../corepersistence/CpRelationManager.java | 61 +++------
.../usergrid/corepersistence/CpWalker.java | 4 +-
.../events/EntityDeletedHandler.java | 31 ++---
.../events/EntityVersionCreatedHandler.java | 2 +-
.../events/EntityVersionDeletedHandler.java | 87 +++++++++---
.../results/FilteringLoader.java | 3 +-
.../corepersistence/util/CpNamingUtils.java | 134 ++++++++++++++++---
.../corepersistence/StaleIndexCleanupTest.java | 10 +-
.../rx/EdgesFromSourceObservableIT.java | 9 +-
.../rx/EdgesToTargetObservableIT.java | 53 ++++----
.../apache/usergrid/utils/EdgeTestUtils.java | 50 +++++++
.../persistence/core/rx/ObservableIterator.java | 2 +
13 files changed, 307 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
index c45c390..a7dda13 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpEntityManager.java
@@ -69,7 +69,6 @@ import org.apache.usergrid.persistence.collection.exception.WriteOptimisticVerif
import org.apache.usergrid.persistence.collection.exception.WriteUniqueVerifyException;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
-import org.apache.usergrid.persistence.core.util.Health;
import org.apache.usergrid.persistence.entities.Application;
import org.apache.usergrid.persistence.entities.Event;
import org.apache.usergrid.persistence.entities.Group;
@@ -80,10 +79,8 @@ import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
import org.apache.usergrid.persistence.exceptions.RequiredPropertyNotFoundException;
import org.apache.usergrid.persistence.exceptions.UnexpectedEntityTypeException;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
-import org.apache.usergrid.persistence.index.EntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexScope;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.index.query.CounterResolution;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
@@ -104,7 +101,6 @@ import org.apache.usergrid.utils.UUIDUtils;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
-import com.netflix.hystrix.exception.HystrixRuntimeException;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.ColumnSlice;
@@ -129,7 +125,7 @@ import static me.prettyprint.hector.api.factory.HFactory.createMutator;
import static org.apache.commons.lang.StringUtils.capitalize;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.usergrid.corepersistence.util.CpEntityMapUtils.entityToCpEntity;
-import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getCollectionScopeNameFromEntityType;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.COLLECTION_USERS;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_PERMISSIONS;
@@ -602,15 +598,6 @@ public class CpEntityManager implements EntityManager {
catch ( WriteUniqueVerifyException wuve ) {
handleWriteUniqueVerifyException( entity, wuve );
}
- catch ( HystrixRuntimeException hre ) {
-
- if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
- WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
- handleWriteUniqueVerifyException( entity, wuve );
- }
-
- throw hre;
- }
// update in all containing collections and connection indexes
CpRelationManager rm = ( CpRelationManager ) getRelationManager( entity );
@@ -1040,10 +1027,6 @@ public class CpEntityManager implements EntityManager {
@Override
public void deleteProperty( EntityRef entityRef, String propertyName ) throws Exception {
- IndexScope defaultIndexScope = new IndexScopeImpl( getApplicationScope().getApplication(),
- getCollectionScopeNameFromEntityType( entityRef.getType() ) );
-
-
Id entityId = new SimpleId( entityRef.getUuid(), entityRef.getType() );
// if ( !UUIDUtils.isTimeBased( entityId.getUuid() ) ) {
@@ -2614,13 +2597,7 @@ public class CpEntityManager implements EntityManager {
catch ( WriteUniqueVerifyException wuve ) {
handleWriteUniqueVerifyException( entity, wuve );
}
- catch ( HystrixRuntimeException hre ) {
- if ( hre.getCause() instanceof WriteUniqueVerifyException ) {
- WriteUniqueVerifyException wuve = ( WriteUniqueVerifyException ) hre.getCause();
- handleWriteUniqueVerifyException( entity, wuve );
- }
- }
// Index CP entity into default collection scope
// IndexScope defaultIndexScope = new IndexScopeImpl(
@@ -2915,9 +2892,7 @@ public class CpEntityManager implements EntityManager {
final EntityIndexBatch batch = aie.createBatch();
// index member into entity collection | type scope
- IndexScope collectionIndexScope = new IndexScopeImpl( collectionEntity.getId(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
-
+ IndexScope collectionIndexScope = generateScopeFromCollection( collectionEntity.getId(), collName );
batch.index( collectionIndexScope, memberEntity );
//TODO REMOVE INDEX CODE
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
index 3427684..b76f38f 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpRelationManager.java
@@ -79,7 +79,6 @@ import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
import org.apache.usergrid.persistence.index.EntityIndexBatch;
import org.apache.usergrid.persistence.index.IndexScope;
import org.apache.usergrid.persistence.index.SearchTypes;
-import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.index.query.Identifier;
import org.apache.usergrid.persistence.index.query.Query;
import org.apache.usergrid.persistence.index.query.Query.Level;
@@ -118,6 +117,11 @@ import rx.functions.Func1;
import static java.util.Arrays.asList;
import static me.prettyprint.hector.api.factory.HFactory.createMutator;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.createId;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromConnection;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
import static org.apache.usergrid.persistence.Schema.COLLECTION_ROLES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_ENTITIES;
import static org.apache.usergrid.persistence.Schema.DICTIONARY_CONNECTED_TYPES;
@@ -342,13 +346,7 @@ public class CpRelationManager implements RelationManager {
final EntityRef eref =
new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
- String name;
- if ( CpNamingUtils.isConnectionEdgeType( edge.getType() ) ) {
- name = CpNamingUtils.getConnectionType( edge.getType() );
- }
- else {
- name = CpNamingUtils.getCollectionName( edge.getType() );
- }
+ String name = getNameFromEdgeType(edge.getType());
addMapSet( entityRefSetMap, eref, name );
}
).toBlocking().last();
@@ -398,26 +396,11 @@ public class CpRelationManager implements RelationManager {
@Override
public void call( final Edge edge ) {
- EntityRef sourceEntity =
- new SimpleEntityRef( edge.getSourceNode().getType(), edge.getSourceNode().getUuid() );
// reindex the entity in the source entity's collection or connection index
- IndexScope indexScope;
- if ( CpNamingUtils.isCollectionEdgeType( edge.getType() ) ) {
-
- String collName = CpNamingUtils.getCollectionName( edge.getType() );
- indexScope = new IndexScopeImpl(
- new SimpleId( sourceEntity.getUuid(), sourceEntity.getType()),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ));
- }
- else {
+ IndexScope indexScope = generateScopeFromSource(edge);
- String connName = CpNamingUtils.getConnectionType( edge.getType() );
- indexScope = new IndexScopeImpl(
- new SimpleId( sourceEntity.getUuid(), sourceEntity.getType() ),
- CpNamingUtils.getConnectionScopeName( connName ) );
- }
entityIndexBatch.index( indexScope, cpEntity );
@@ -551,7 +534,7 @@ public class CpRelationManager implements RelationManager {
Iterator<String> iter = str.toBlocking().getIterator();
while ( iter.hasNext() ) {
String edgeType = iter.next();
- indexes.add( CpNamingUtils.getCollectionName( edgeType ) );
+ indexes.add( getNameFromEdgeType( edgeType ) );
}
return indexes;
@@ -796,22 +779,18 @@ public class CpRelationManager implements RelationManager {
final EntityIndexBatch batch = ei.createBatch();
// remove item from collection index
- IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+ IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
batch.deindex( indexScope, memberEntity );
// remove collection from item index
- IndexScope itemScope = new IndexScopeImpl(
- memberEntity.getId(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName(
- Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) ) );
+ IndexScope itemScope = generateScopeFromCollection( memberEntity.getId(),
+ Schema.defaultCollectionName( cpHeadEntity.getId().getType() ) );
batch.deindex( itemScope, cpHeadEntity );
- BetterFuture future = batch.execute();
+ batch.execute();
// remove edge from collection to item
GraphManager gm = managerCache.getGraphManager( applicationScope );
@@ -905,9 +884,7 @@ public class CpRelationManager implements RelationManager {
+ "' of " + headEntity.getType() + ":" + headEntity .getUuid() );
}
- final IndexScope indexScope = new IndexScopeImpl(
- cpHeadEntity.getId(),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+ final IndexScope indexScope = generateScopeFromCollection( cpHeadEntity.getId(), collName );
final ApplicationEntityIndex ei = managerCache.getEntityIndex( applicationScope );
@@ -978,8 +955,7 @@ public class CpRelationManager implements RelationManager {
EntityIndexBatch batch = ei.createBatch();
// Index the new connection in app|source|type context
- IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
- CpNamingUtils.getConnectionScopeName( connectionType ) );
+ IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connectionType );
batch.index( indexScope, targetEntity );
@@ -1208,10 +1184,8 @@ public class CpRelationManager implements RelationManager {
final EntityIndexBatch batch = ei.createBatch();
// Deindex the connection in app|source|type context
- IndexScope indexScope = new IndexScopeImpl(
- new SimpleId( connectingEntityRef.getUuid(),
- connectingEntityRef.getType() ),
- CpNamingUtils.getConnectionScopeName( connectionType ) );
+ final Id cpId = createId( connectingEntityRef );
+ IndexScope indexScope = generateScopeFromConnection( cpId, connectionType );
batch.deindex( indexScope, targetEntity );
// Deindex the connection in app|source|type context
@@ -1334,8 +1308,7 @@ public class CpRelationManager implements RelationManager {
headEntity = em.validate( headEntity );
- final IndexScope indexScope = new IndexScopeImpl( cpHeadEntity.getId(),
- CpNamingUtils.getConnectionScopeName( connection ) );
+ final IndexScope indexScope = generateScopeFromConnection( cpHeadEntity.getId(), connection );
final SearchTypes searchTypes = SearchTypes.fromNullableTypes( query.getEntityType() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
index c14447d..3f2c9d6 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/CpWalker.java
@@ -36,6 +36,8 @@ import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.getNameFromEdgeType;
+
/**
* Takes a visitor to all collections and entities.
@@ -128,7 +130,7 @@ public class CpWalker {
if ( entity == null ) {
return;
}
- String collName = CpNamingUtils.getCollectionName( edgeValue.getType() );
+ String collName = getNameFromEdgeType( edgeValue.getType() );
visitor.visitCollectionEntry( em, collName, entity );
} ).subscribeOn( Schedulers.io() );
}, 100 );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
index 78c1ca7..57d69bc 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityDeletedHandler.java
@@ -65,21 +65,22 @@ public class EntityDeletedHandler implements EntityDeleted {
return;
}
-
-
- if(logger.isDebugEnabled()) {
- logger.debug(
- "Handling deleted event for entity {}:{} v {} " + " app: {}",
- new Object[] {
- entityId.getType(), entityId.getUuid(), version,
- scope.getApplication()
- } );
- }
-
- CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
- final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
-
- throw new NotImplementedException( "Fix this" );
+// This is a NO-OP now, it's handled by the EntityVersionDeletedHandler
+
+//
+// if(logger.isDebugEnabled()) {
+// logger.debug(
+// "Handling deleted event for entity {}:{} v {} " + " app: {}",
+// new Object[] {
+// entityId.getType(), entityId.getUuid(), version,
+// scope.getApplication()
+// } );
+// }
+//
+// CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
+// final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+
+// throw new NotImplementedException( "Fix this" );
//read all edges to this node and de-index them
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
index c000500..0163fc2 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionCreatedHandler.java
@@ -55,7 +55,7 @@ public class EntityVersionCreatedHandler implements EntityVersionCreated {
@Override
public void versionCreated( final ApplicationScope scope, final Entity entity ) {
- //not op, we're not migrating properly to this. Make this an event
+ //not op, we're not migrating properly to this. Make this an event At the moment this is happening on write
// // This check is for testing purposes and for a test that to be able to dynamically turn
// // off and on delete previous versions so that it can test clean-up on read.
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
index 4fa5ce1..a2e9b30 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/events/EntityVersionDeletedHandler.java
@@ -24,18 +24,30 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.corepersistence.CpEntityManagerFactory;
-import org.apache.usergrid.exception.NotImplementedException;
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
import org.apache.usergrid.persistence.EntityManagerFactory;
import org.apache.usergrid.persistence.collection.MvccLogEntry;
import org.apache.usergrid.persistence.collection.event.EntityVersionDeleted;
+import org.apache.usergrid.persistence.collection.serialization.SerializationFig;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.graph.GraphManager;
+import org.apache.usergrid.persistence.graph.serialization.EdgesObservable;
import org.apache.usergrid.persistence.index.ApplicationEntityIndex;
+import org.apache.usergrid.persistence.index.EntityIndexBatch;
+import org.apache.usergrid.persistence.index.IndexBatchBuffer;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import rx.Observable;
+
import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromSource;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeToTarget;
/**
@@ -49,10 +61,17 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
private final EntityManagerFactory emf;
+ private final EdgesObservable edgesObservable;
+ private final SerializationFig serializationFig;
@Inject
- public EntityVersionDeletedHandler( final EntityManagerFactory emf ) {this.emf = emf;}
+ public EntityVersionDeletedHandler( final EntityManagerFactory emf, final EdgesObservable edgesObservable,
+ final SerializationFig serializationFig ) {
+ this.emf = emf;
+ this.edgesObservable = edgesObservable;
+ this.serializationFig = serializationFig;
+ }
@Override
@@ -67,32 +86,60 @@ public class EntityVersionDeletedHandler implements EntityVersionDeleted {
}
if ( logger.isDebugEnabled() ) {
- logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} "
- + " app: {}", new Object[] {
- entityVersions.size(), entityId.getType(), entityId.getUuid(),
- scope.getApplication()
- } );
+ logger.debug( "Handling versionDeleted count={} event for entity {}:{} v {} " + " app: {}", new Object[] {
+ entityVersions.size(), entityId.getType(), entityId.getUuid(), scope.getApplication()
+ } );
}
CpEntityManagerFactory cpemf = ( CpEntityManagerFactory ) emf;
final ApplicationEntityIndex ei = cpemf.getManagerCache().getEntityIndex( scope );
+ final GraphManager gm = cpemf.getManagerCache().getGraphManager( scope );
+
+
+ //create an observable of all scopes to deIndex
+ //remove all indexes pointing to this
+ final Observable<IndexScope> targetScopes = edgesObservable.edgesToTarget( gm, entityId ).map(
+ edge -> generateScopeFromSource( edge) );
+
+
+ //Remove all double indexes
+ final Observable<IndexScope> sourceScopes = edgesObservable.edgesFromSource( gm, entityId ).map(
+ edge -> generateScopeToTarget( edge ) );
+
+
+ //create a stream of scopes
+ final Observable<IndexScopeVersion> versions = Observable.merge( targetScopes, sourceScopes ).flatMap(
+ indexScope -> Observable.from( entityVersions )
+ .map( version -> new IndexScopeVersion( indexScope, version ) ) );
+
+ //create a set of batches
+ final Observable<EntityIndexBatch> batches = versions.buffer( serializationFig.getBufferSize() ).flatMap(
+ bufferedVersions -> Observable.from( bufferedVersions ).collect( () -> ei.createBatch(),
+ ( EntityIndexBatch batch, IndexScopeVersion version ) -> {
+ //deindex in this batch
+ batch.deindex( version.scope, version.version.getEntityId(), version.version.getVersion() );
+ } ) );
- throw new NotImplementedException( "Fix this" );
+ //execute the batches
+ batches.doOnNext( batch -> batch.execute() ).toBlocking().last();
-// final IndexScope indexScope =
-// new IndexScopeImpl( new SimpleId( scope.getOwner().getUuid(), scope.getOwner().getType() ),
-// scope.getName() );
-//
-// //create our batch, and then collect all of them into a single batch
-// Observable.from( entityVersions ).collect( () -> ei.createBatch(), ( entityIndexBatch, mvccLogEntry ) -> {
-// entityIndexBatch.deindex( indexScope, mvccLogEntry.getEntityId(), mvccLogEntry.getVersion() );
-// } )
-// //after our batch is collected, execute it
-// .doOnNext( entityIndexBatch -> {
-// entityIndexBatch.execute();
-// } ).toBlocking().last();
+ }
+
+
+
+
+
+ private static final class IndexScopeVersion{
+ private final IndexScope scope;
+ private final MvccLogEntry version;
+
+
+ private IndexScopeVersion( final IndexScope scope, final MvccLogEntry version ) {
+ this.scope = scope;
+ this.version = version;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
index 2eb6675..566656b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/results/FilteringLoader.java
@@ -119,8 +119,7 @@ public class FilteringLoader implements ResultsLoader {
final CandidateResult currentCandidate = iter.next();
- final String collectionType = CpNamingUtils.getCollectionScopeNameFromEntityType(
- currentCandidate.getId().getType() );
+ final String collectionType = currentCandidate.getId().getType() ;
final Id entityId = currentCandidate.getId();
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
index 2e9fb55..652742b 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/util/CpNamingUtils.java
@@ -21,15 +21,21 @@ package org.apache.usergrid.corepersistence.util;
import java.util.UUID;
+import org.apache.usergrid.persistence.EntityRef;
import org.apache.usergrid.persistence.Schema;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.core.scope.ApplicationScopeImpl;
import org.apache.usergrid.persistence.entities.Application;
+import org.apache.usergrid.persistence.graph.Edge;
+import org.apache.usergrid.persistence.index.IndexScope;
+import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.map.MapScope;
import org.apache.usergrid.persistence.map.impl.MapScopeImpl;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import com.clearspring.analytics.util.Preconditions;
+
/**
* Utilises for constructing standard naming conventions for collections and connections
@@ -80,66 +86,150 @@ public class CpNamingUtils {
* @param type
* @return
*/
- public static String getCollectionScopeNameFromEntityType( String type ) {
+ private static String getCollectionScopeNameFromEntityType( String type ) {
String csn = EDGE_COLL_SUFFIX + Schema.defaultCollectionName( type );
return csn.toLowerCase();
}
- public static String getCollectionScopeNameFromCollectionName( String name ) {
+ private static String getCollectionScopeNameFromCollectionName( String name ) {
String csn = EDGE_COLL_SUFFIX + name;
return csn.toLowerCase();
}
- public static String getConnectionScopeName( String connectionType ) {
+ private static String getConnectionScopeName( String connectionType ) {
String csn = EDGE_CONN_SUFFIX + connectionType ;
return csn.toLowerCase();
}
- public static boolean isCollectionEdgeType( String type ) {
+ /**
+ * Get the index scope for the edge from the source
+ * @param edge
+ * @return
+ */
+ public static IndexScope generateScopeFromSource(final Edge edge ){
+
+
+ final Id nodeId = edge.getSourceNode();
+ final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+ return new IndexScopeImpl( nodeId, scopeName );
+
+ }
+
+
+
+
+
+ /**
+ * Get the index scope for the edge from the source
+ * @param edge
+ * @return
+ */
+ public static IndexScope generateScopeToTarget(final Edge edge ){
+
+
+
+ final Id nodeId = edge.getTargetNode();
+ final String scopeName = getNameFromEdgeType( edge.getType() );
+
+
+ return new IndexScopeImpl( nodeId, scopeName );
+
+ }
+
+
+ /**
+ * Generate either the collection name or connection name from the edgeName
+ * @param edgeName
+ * @return
+ */
+ public static String getNameFromEdgeType(final String edgeName){
+
+
+ if(isCollectionEdgeType( edgeName )){
+ return getCollectionScopeNameFromCollectionName(getCollectionName(edgeName) );
+ }
+
+ return getConnectionScopeName(getConnectionType( edgeName ) );
+
+ }
+
+
+ /**
+ * Get the index scope from the colleciton name
+ * @param nodeId The source or target node id
+ * @param collectionName The name of the collection. Ex "users"
+ * @return
+ */
+ public static IndexScope generateScopeFromCollection( final Id nodeId, final String collectionName ){
+ return new IndexScopeImpl( nodeId, getCollectionScopeNameFromCollectionName( collectionName ) );
+ }
+
+
+ /**
+ * Get the scope from the connection
+ * @param nodeId
+ * @param connectionName
+ * @return
+ */
+ public static IndexScope generateScopeFromConnection( final Id nodeId, final String connectionName ){
+ return new IndexScopeImpl( nodeId, getConnectionScopeName( connectionName ) );
+ }
+
+
+ /**
+ * Create an Id object from the entity ref
+ * @param entityRef
+ * @return
+ */
+ public static Id createId(final EntityRef entityRef){
+ return new SimpleId( entityRef.getUuid(), entityRef.getType() );
+ }
+
+ private static boolean isCollectionEdgeType( String type ) {
return type.startsWith( EDGE_COLL_SUFFIX );
}
- public static boolean isConnectionEdgeType( String type ) {
+ private static boolean isConnectionEdgeType( String type ) {
return type.startsWith( EDGE_CONN_SUFFIX );
}
- static public String getConnectionType( String edgeType ) {
+
+ private static String getConnectionType( String edgeType ) {
String[] parts = edgeType.split( "\\|" );
return parts[1];
}
- static public String getCollectionName( String edgeType ) {
+ private static String getCollectionName( String edgeType ) {
String[] parts = edgeType.split( "\\|" );
return parts[1];
}
+ /**
+ * Generate a standard edge name for our graph using the connection name
+ * @param connectionType The type of connection made
+ * @return
+ */
public static String getEdgeTypeFromConnectionType( String connectionType ) {
-
- if ( connectionType != null ) {
- String csn = EDGE_CONN_SUFFIX + "|" + connectionType;
- return csn;
- }
-
- return null;
+ return (EDGE_CONN_SUFFIX + "|" + connectionType).toLowerCase();
}
+ /**
+ * Generate a standard edges from for a collection
+ * @param collectionName
+ * @return
+ */
public static String getEdgeTypeFromCollectionName( String collectionName ) {
-
- if ( collectionName != null ) {
- String csn = EDGE_COLL_SUFFIX + "|" + collectionName;
- return csn;
- }
-
-
- return null;
+ return (EDGE_COLL_SUFFIX + "|" + collectionName).toLowerCase();
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
index 5c166c5..f743f0b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/StaleIndexCleanupTest.java
@@ -52,6 +52,7 @@ import org.apache.usergrid.persistence.index.SearchTypes;
import org.apache.usergrid.persistence.index.impl.IndexScopeImpl;
import org.apache.usergrid.persistence.index.query.CandidateResults;
import org.apache.usergrid.persistence.index.query.Query;
+import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
import com.fasterxml.uuid.UUIDComparator;
@@ -60,7 +61,9 @@ import com.google.inject.Injector;
import net.jcip.annotations.NotThreadSafe;
import static org.apache.usergrid.corepersistence.CoreModule.EVENTS_DISABLED;
+import static org.apache.usergrid.corepersistence.util.CpNamingUtils.generateScopeFromCollection;
import static org.apache.usergrid.persistence.Schema.TYPE_APPLICATION;
+import static org.apache.usergrid.persistence.core.util.IdGenerator.createId;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -483,14 +486,15 @@ public class StaleIndexCleanupTest extends AbstractCoreIT {
EntityManager em = app.getEntityManager();
- EntityIndexFactory eif = SpringResource.getInstance().getBean( Injector.class ).getInstance( EntityIndexFactory.class );
+ EntityIndexFactory eif = SpringResource.getInstance().getBean( Injector.class ).getInstance(
+ EntityIndexFactory.class );
ApplicationScope as = new ApplicationScopeImpl(
new SimpleId( em.getApplicationId(), TYPE_APPLICATION ) );
ApplicationEntityIndex ei = eif.createApplicationEntityIndex(as);
- IndexScope is = new IndexScopeImpl( new SimpleId( em.getApplicationId(), TYPE_APPLICATION ),
- CpNamingUtils.getCollectionScopeNameFromCollectionName( collName ) );
+ final Id rootId = createId(em.getApplicationId(), TYPE_APPLICATION);
+ IndexScope is = generateScopeFromCollection(rootId, collName );
Query rcq = Query.fromQL( query );
// TODO: why does this have no effect; max we ever get is 1000 entities
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
index 50c2cd9..3bfe460 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesFromSourceObservableIT.java
@@ -41,6 +41,7 @@ import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.model.entity.Id;
import org.apache.usergrid.persistence.model.entity.SimpleId;
+import org.apache.usergrid.utils.EdgeTestUtils;
import com.google.inject.Injector;
@@ -98,8 +99,8 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
final Id source = edge.getSourceNode();
//test if we're a collection, if so
- if ( CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
- final String collectionName = CpNamingUtils.getCollectionName( edgeType );
+ if ( EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+ final String collectionName = EdgeTestUtils.getNameForEdge( edgeType );
assertEquals("application source returned", createdApplication.getUuid(), source.getUuid());
@@ -112,11 +113,11 @@ public class EdgesFromSourceObservableIT extends AbstractCoreIT {
- if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
+ if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
fail( "Only connection edges should be encountered" );
}
- final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+ final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
assertEquals( "Same connection type expected", "likes", connectionType );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
index 9f1bb17..6d228b2 100644
--- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/rx/EdgesToTargetObservableIT.java
@@ -38,6 +38,7 @@ import org.apache.usergrid.persistence.core.scope.ApplicationScope;
import org.apache.usergrid.persistence.graph.Edge;
import org.apache.usergrid.persistence.graph.GraphManager;
import org.apache.usergrid.persistence.model.entity.Id;
+import org.apache.usergrid.utils.EdgeTestUtils;
import com.google.inject.Injector;
@@ -92,29 +93,26 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
final GraphManager gm = managerCache.getGraphManager( scope );
- edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( new Action1<Edge>() {
- @Override
- public void call( final Edge edge ) {
- final String edgeType = edge.getType();
- final Id target = edge.getTargetNode();
+ edgesFromSourceObservable.edgesFromSource( gm, applicationId ).doOnNext( edge -> {
+ final String edgeType = edge.getType();
+ final Id target = edge.getTargetNode();
- //test if we're a collection, if so remove ourselves fro the types
- if ( !CpNamingUtils.isCollectionEdgeType( edgeType ) ) {
- fail( "Connections should be the only type encountered" );
- }
+ //test if we're a collection, if so remove ourselves fro the types
+ if ( !EdgeTestUtils.isCollectionEdgeType( edgeType ) ) {
+ fail( "Connections should be the only type encountered" );
+ }
- final String collectionType = CpNamingUtils.getCollectionName( edgeType );
+ final String collectionType = EdgeTestUtils.getNameForEdge( edgeType );
- if ( collectionType.equals( type1 ) ) {
- assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
- }
- else if ( collectionType.equals( type2 ) ) {
- assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
- }
+ if ( collectionType.equals( type1 ) ) {
+ assertTrue( "Element should be present on removal", type1Identities.remove( target ) );
+ }
+ else if ( collectionType.equals( type2 ) ) {
+ assertTrue( "Element should be present on removal", type2Identities.remove( target ) );
+ }
- }
} ).toBlocking().lastOrDefault( null );
@@ -124,23 +122,20 @@ public class EdgesToTargetObservableIT extends AbstractCoreIT {
//test connections
- edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( new Action1<Edge>() {
- @Override
- public void call( final Edge edge ) {
- final String edgeType = edge.getType();
- final Id target = edge.getTargetNode();
+ edgesFromSourceObservable.edgesFromSource( gm, source).doOnNext( edge -> {
+ final String edgeType = edge.getType();
+ final Id target = edge.getTargetNode();
- if ( !CpNamingUtils.isConnectionEdgeType( edgeType ) ) {
- fail( "Only connection edges should be encountered" );
- }
+ if ( !EdgeTestUtils.isConnectionEdgeType( edgeType ) ) {
+ fail( "Only connection edges should be encountered" );
+ }
- final String connectionType = CpNamingUtils.getConnectionType( edgeType );
+ final String connectionType = EdgeTestUtils.getNameForEdge( edgeType );
- assertEquals( "Same connection type expected", "likes", connectionType );
+ assertEquals( "Same connection type expected", "likes", connectionType );
- assertTrue( "Element should be present on removal", connections.remove( target ) );
- }
+ assertTrue( "Element should be present on removal", connections.remove( target ) );
} ).toBlocking().lastOrDefault( null );
assertEquals( "Every connection should have been encountered", 0, connections.size() );
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
new file mode 100644
index 0000000..f217338
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/utils/EdgeTestUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.usergrid.utils;
+
+
+import org.apache.usergrid.corepersistence.util.CpNamingUtils;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class EdgeTestUtils {
+
+ /**
+ * Get the name for an edge
+ */
+ public static String getNameForEdge( final String edgeName ) {
+ final String[] parts = edgeName.split( "\\|" );
+
+ assertEquals( "there should be 2 parts", parts.length, 2 );
+
+ return parts[1];
+ }
+
+
+ public static boolean isCollectionEdgeType( String type ) {
+ return type.startsWith( CpNamingUtils.EDGE_COLL_SUFFIX );
+ }
+
+
+ public static boolean isConnectionEdgeType( String type ) {
+ return type.startsWith( CpNamingUtils.EDGE_CONN_SUFFIX );
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/fbce62df/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
index 2bd1edb..84a7fc3 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/rx/ObservableIterator.java
@@ -53,6 +53,8 @@ public abstract class ObservableIterator<T> implements Observable.OnSubscribe<T>
try {
+ subscriber.onStart();
+
//get our iterator and push data to the observer
final Iterator<T> itr = getIterator();